diff options
Diffstat (limited to 'sql/handler.cc')
-rw-r--r-- | sql/handler.cc | 1856 |
1 files changed, 1160 insertions, 696 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index 4e891f5d640..2fad0dca954 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -30,7 +30,6 @@ #include "key.h" // key_copy, key_unpack, key_cmp_if_same, key_cmp #include "sql_table.h" // build_table_filename #include "sql_parse.h" // check_stack_overrun -#include "sql_acl.h" // SUPER_ACL #include "sql_base.h" // TDC_element #include "discover.h" // extension_based_table_discovery, etc #include "log_event.h" // *_rows_log_event @@ -40,10 +39,13 @@ #include "myisam.h" #include "probes_mysql.h" #include <mysql/psi/mysql_table.h> +#include <pfs_transaction_provider.h> +#include <mysql/psi/mysql_transaction.h> #include "debug_sync.h" // DEBUG_SYNC #include "sql_audit.h" #include "ha_sequence.h" #include "rowid_filter.h" +#include "mysys_err.h" #ifdef WITH_PARTITION_STORAGE_ENGINE #include "ha_partition.h" @@ -60,8 +62,42 @@ #include "wsrep_xid.h" #include "wsrep_thd.h" #include "wsrep_trans_observer.h" /* wsrep transaction hooks */ +#include "wsrep_var.h" /* wsrep_hton_check() */ #endif /* WITH_WSREP */ +/** + @def MYSQL_TABLE_LOCK_WAIT + Instrumentation helper for table io_waits. + @param OP the table operation to be performed + @param FLAGS per table operation flags. + @param PAYLOAD the code to instrument. + @sa MYSQL_END_TABLE_WAIT. +*/ +#ifdef HAVE_PSI_TABLE_INTERFACE + #define MYSQL_TABLE_LOCK_WAIT(OP, FLAGS, PAYLOAD) \ + { \ + if (m_psi != NULL) \ + { \ + PSI_table_locker *locker; \ + PSI_table_locker_state state; \ + locker= PSI_TABLE_CALL(start_table_lock_wait) \ + (& state, m_psi, OP, FLAGS, \ + __FILE__, __LINE__); \ + PAYLOAD \ + if (locker != NULL) \ + PSI_TABLE_CALL(end_table_lock_wait)(locker); \ + } \ + else \ + { \ + PAYLOAD \ + } \ + } +#else + #define MYSQL_TABLE_LOCK_WAIT(OP, FLAGS, PAYLOAD) \ + PAYLOAD +#endif + + /* While we have legacy_db_type, we have this array to check for dups and to find handlerton from legacy_db_type. @@ -113,6 +149,44 @@ TYPELIB tx_isolation_typelib= {array_elements(tx_isolation_names)-1,"", static TYPELIB known_extensions= {0,"known_exts", NULL, NULL}; uint known_extensions_id= 0; + +class Table_exists_error_handler : public Internal_error_handler +{ +public: + Table_exists_error_handler() + : m_handled_errors(0), m_unhandled_errors(0) + {} + + bool handle_condition(THD *thd, + uint sql_errno, + const char* sqlstate, + Sql_condition::enum_warning_level *level, + const char* msg, + Sql_condition ** cond_hdl) + { + *cond_hdl= NULL; + if (non_existing_table_error(sql_errno)) + { + m_handled_errors++; + return TRUE; + } + + if (*level == Sql_condition::WARN_LEVEL_ERROR) + m_unhandled_errors++; + return FALSE; + } + + bool safely_trapped_errors() + { + return ((m_handled_errors > 0) && (m_unhandled_errors == 0)); + } + +private: + int m_handled_errors; + int m_unhandled_errors; +}; + + static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans); @@ -133,23 +207,6 @@ static plugin_ref ha_default_tmp_plugin(THD *thd) return ha_default_plugin(thd); } -#if defined(WITH_ARIA_STORAGE_ENGINE) && MYSQL_VERSION_ID < 100500 -void ha_maria_implicit_commit(THD *thd, bool new_trn) -{ - if (ha_maria::has_active_transaction(thd)) - { - int error; - MDL_request mdl_request; - mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT); - error= thd->mdl_context.acquire_lock(&mdl_request, - thd->variables.lock_wait_timeout); - ha_maria::implicit_commit(thd, new_trn); - if (!error) - thd->mdl_context.release_lock(mdl_request.ticket); - } -} -#endif - /** @brief Return the default storage engine handlerton for thread @@ -199,8 +256,7 @@ plugin_ref ha_resolve_by_name(THD *thd, const LEX_CSTRING *name, plugin_ref plugin; redo: - /* my_strnncoll is a macro and gcc doesn't do early expansion of macro */ - if (thd && !my_charset_latin1.coll->strnncoll(&my_charset_latin1, + if (thd && !my_charset_latin1.strnncoll( (const uchar *)name->str, name->length, (const uchar *)STRING_WITH_LEN("DEFAULT"), 0)) return tmp_table ? ha_default_tmp_plugin(thd) : ha_default_plugin(thd); @@ -222,7 +278,7 @@ redo: */ for (table_alias= sys_table_aliases; table_alias->str; table_alias+= 2) { - if (!my_strnncoll(&my_charset_latin1, + if (!my_charset_latin1.strnncoll( (const uchar *)name->str, name->length, (const uchar *)table_alias->str, table_alias->length)) { @@ -314,7 +370,7 @@ handler *get_new_handler(TABLE_SHARE *share, MEM_ROOT *alloc, DBUG_ENTER("get_new_handler"); DBUG_PRINT("enter", ("alloc: %p", alloc)); - if (db_type && db_type->state == SHOW_OPTION_YES && db_type->create) + if (ha_storage_engine_is_enabled(db_type)) { if ((file= db_type->create(db_type, share, alloc))) file->init(); @@ -379,7 +435,8 @@ int ha_init_errors(void) /* Allocate a pointer array for the error message strings. */ /* Zerofill it to avoid uninitialized gaps. */ - if (! (handler_errmsgs= (const char**) my_malloc(HA_ERR_ERRORS * sizeof(char*), + if (! (handler_errmsgs= (const char**) my_malloc(key_memory_handler_errmsgs, + HA_ERR_ERRORS * sizeof(char*), MYF(MY_WME | MY_ZEROFILL)))) return 1; @@ -490,6 +547,26 @@ static void update_discovery_counters(handlerton *hton, int val) engines_with_discover+= val; } +int ha_drop_table(THD *thd, handlerton *hton, const char *path) +{ + if (ha_check_if_updates_are_ignored(thd, hton, "DROP")) + return 0; // Simulate dropped + return hton->drop_table(hton, path); +} + +static int hton_drop_table(handlerton *hton, const char *path) +{ + char tmp_path[FN_REFLEN]; + handler *file= get_new_handler(nullptr, current_thd->mem_root, hton); + if (!file) + return ENOMEM; + path= get_canonical_filename(file, path, tmp_path); + int error= file->delete_table(path); + delete file; + return error; +} + + int ha_finalize_handlerton(st_plugin_int *plugin) { handlerton *hton= (handlerton *)plugin->data; @@ -499,15 +576,8 @@ int ha_finalize_handlerton(st_plugin_int *plugin) if (!hton) goto end; - switch (hton->state) { - case SHOW_OPTION_NO: - case SHOW_OPTION_DISABLED: - break; - case SHOW_OPTION_YES: - if (installed_htons[hton->db_type] == hton) - installed_htons[hton->db_type]= NULL; - break; - }; + if (installed_htons[hton->db_type] == hton) + installed_htons[hton->db_type]= NULL; if (hton->panic) hton->panic(hton, HA_PANIC_CLOSE); @@ -556,7 +626,7 @@ int ha_initialize_handlerton(st_plugin_int *plugin) DBUG_ENTER("ha_initialize_handlerton"); DBUG_PRINT("plugin", ("initialize plugin: '%s'", plugin->name.str)); - hton= (handlerton *)my_malloc(sizeof(handlerton), + hton= (handlerton *)my_malloc(key_memory_handlerton, sizeof(handlerton), MYF(MY_WME | MY_ZEROFILL)); if (hton == NULL) { @@ -567,6 +637,7 @@ int ha_initialize_handlerton(st_plugin_int *plugin) hton->tablefile_extensions= no_exts; hton->discover_table_names= hton_ext_based_table_discovery; + hton->drop_table= hton_drop_table; hton->slot= HA_SLOT_UNDEF; /* Historical Requirement */ @@ -574,7 +645,7 @@ int ha_initialize_handlerton(st_plugin_int *plugin) if (plugin->plugin->init && plugin->plugin->init(hton)) { sql_print_error("Plugin '%s' init function returned error.", - plugin->name.str); + plugin->name.str); goto err; } @@ -593,90 +664,78 @@ int ha_initialize_handlerton(st_plugin_int *plugin) hton->discover_table_existence= full_discover_for_existence; } - switch (hton->state) { - case SHOW_OPTION_NO: - break; - case SHOW_OPTION_YES: - { - uint tmp; - ulong fslot; - - DBUG_EXECUTE_IF("unstable_db_type", { - static int i= (int) DB_TYPE_FIRST_DYNAMIC; - hton->db_type= (enum legacy_db_type)++i; - }); - - /* now check the db_type for conflict */ - if (hton->db_type <= DB_TYPE_UNKNOWN || - hton->db_type >= DB_TYPE_DEFAULT || - installed_htons[hton->db_type]) - { - int idx= (int) DB_TYPE_FIRST_DYNAMIC; + uint tmp; + ulong fslot; - while (idx < (int) DB_TYPE_DEFAULT && installed_htons[idx]) - idx++; + DBUG_EXECUTE_IF("unstable_db_type", { + static int i= (int) DB_TYPE_FIRST_DYNAMIC; + hton->db_type= (enum legacy_db_type)++i; + }); - if (idx == (int) DB_TYPE_DEFAULT) - { - sql_print_warning("Too many storage engines!"); - goto err_deinit; - } - if (hton->db_type != DB_TYPE_UNKNOWN) - sql_print_warning("Storage engine '%s' has conflicting typecode. " - "Assigning value %d.", plugin->plugin->name, idx); - hton->db_type= (enum legacy_db_type) idx; - } + /* now check the db_type for conflict */ + if (hton->db_type <= DB_TYPE_UNKNOWN || + hton->db_type >= DB_TYPE_DEFAULT || + installed_htons[hton->db_type]) + { + int idx= (int) DB_TYPE_FIRST_DYNAMIC; - /* - In case a plugin is uninstalled and re-installed later, it should - reuse an array slot. Otherwise the number of uninstall/install - cycles would be limited. So look for a free slot. - */ - DBUG_PRINT("plugin", ("total_ha: %lu", total_ha)); - for (fslot= 0; fslot < total_ha; fslot++) - { - if (!hton2plugin[fslot]) - break; - } - if (fslot < total_ha) - hton->slot= fslot; - else - { - if (total_ha >= MAX_HA) - { - sql_print_error("Too many plugins loaded. Limit is %lu. " - "Failed on '%s'", (ulong) MAX_HA, plugin->name.str); - goto err_deinit; - } - hton->slot= total_ha++; - } - installed_htons[hton->db_type]= hton; - tmp= hton->savepoint_offset; - hton->savepoint_offset= savepoint_alloc_size; - savepoint_alloc_size+= tmp; - hton2plugin[hton->slot]=plugin; - if (hton->prepare) - { - total_ha_2pc++; - if (tc_log && tc_log != get_tc_log_implementation()) - { - total_ha_2pc--; - hton->prepare= 0; - push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, - ER_UNKNOWN_ERROR, - "Cannot enable tc-log at run-time. " - "XA features of %s are disabled", - plugin->name.str); - } - } + while (idx < (int) DB_TYPE_DEFAULT && installed_htons[idx]) + idx++; + + if (idx == (int) DB_TYPE_DEFAULT) + { + sql_print_warning("Too many storage engines!"); + goto err_deinit; + } + if (hton->db_type != DB_TYPE_UNKNOWN) + sql_print_warning("Storage engine '%s' has conflicting typecode. " + "Assigning value %d.", plugin->plugin->name, idx); + hton->db_type= (enum legacy_db_type) idx; + } + + /* + In case a plugin is uninstalled and re-installed later, it should + reuse an array slot. Otherwise the number of uninstall/install + cycles would be limited. So look for a free slot. + */ + DBUG_PRINT("plugin", ("total_ha: %lu", total_ha)); + for (fslot= 0; fslot < total_ha; fslot++) + { + if (!hton2plugin[fslot]) break; + } + if (fslot < total_ha) + hton->slot= fslot; + else + { + if (total_ha >= MAX_HA) + { + sql_print_error("Too many plugins loaded. Limit is %lu. " + "Failed on '%s'", (ulong) MAX_HA, plugin->name.str); + goto err_deinit; } - /* fall through */ - default: - hton->state= SHOW_OPTION_DISABLED; - break; + hton->slot= total_ha++; } - + installed_htons[hton->db_type]= hton; + tmp= hton->savepoint_offset; + hton->savepoint_offset= savepoint_alloc_size; + savepoint_alloc_size+= tmp; + hton2plugin[hton->slot]=plugin; + if (hton->prepare) + { + total_ha_2pc++; + if (tc_log && tc_log != get_tc_log_implementation()) + { + total_ha_2pc--; + hton->prepare= 0; + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, + ER_UNKNOWN_ERROR, + "Cannot enable tc-log at run-time. " + "XA features of %s are disabled", + plugin->name.str); + } + } + /* This is entirely for legacy. We will create a new "disk based" hton and a "memory" hton which will be configurable longterm. We should be able to @@ -711,10 +770,10 @@ err_deinit: */ if (plugin->plugin->deinit) (void) plugin->plugin->deinit(NULL); - + err: #ifdef DBUG_ASSERT_EXISTS - if (hton->prepare && hton->state == SHOW_OPTION_YES) + if (hton->prepare) failed_ha_2pc++; #endif my_free(hton); @@ -759,7 +818,7 @@ static my_bool dropdb_handlerton(THD *unused1, plugin_ref plugin, void *path) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->drop_database) + if (hton->drop_database) hton->drop_database(hton, (char *)path); return FALSE; } @@ -775,7 +834,7 @@ static my_bool checkpoint_state_handlerton(THD *unused1, plugin_ref plugin, void *disable) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->checkpoint_state) + if (hton->checkpoint_state) hton->checkpoint_state(hton, (int) *(bool*) disable); return FALSE; } @@ -797,12 +856,12 @@ static my_bool commit_checkpoint_request_handlerton(THD *unused1, plugin_ref plu { st_commit_checkpoint_request *st= (st_commit_checkpoint_request *)data; handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->commit_checkpoint_request) + if (hton->commit_checkpoint_request) { void *cookie= st->cookie; if (st->pre_hook) (*st->pre_hook)(cookie); - (*hton->commit_checkpoint_request)(hton, cookie); + (*hton->commit_checkpoint_request)(cookie); } return FALSE; } @@ -824,34 +883,29 @@ ha_commit_checkpoint_request(void *cookie, void (*pre_hook)(void *)) } - -static my_bool closecon_handlerton(THD *thd, plugin_ref plugin, - void *unused) -{ - handlerton *hton= plugin_hton(plugin); - /* - there's no need to rollback here as all transactions must - be rolled back already - */ - if (hton->state == SHOW_OPTION_YES && thd_get_ha_data(thd, hton)) - { - if (hton->close_connection) - hton->close_connection(hton, thd); - /* make sure ha_data is reset and ha_data_lock is released */ - thd_set_ha_data(thd, hton, NULL); - } - return FALSE; -} - /** @note don't bother to rollback here, it's done already + + there's no need to rollback here as all transactions must + be rolled back already */ void ha_close_connection(THD* thd) { - plugin_foreach_with_mask(thd, closecon_handlerton, - MYSQL_STORAGE_ENGINE_PLUGIN, - PLUGIN_IS_DELETED|PLUGIN_IS_READY, 0); + for (auto i= 0; i < MAX_HA; i++) + { + if (thd->ha_data[i].lock) + { + handlerton *hton= plugin_hton(thd->ha_data[i].lock); + if (hton->close_connection) + hton->close_connection(hton, thd); + /* make sure SE didn't reset ha_data in close_connection() */ + DBUG_ASSERT(thd->ha_data[i].lock); + /* make sure ha_data is reset and ha_data_lock is released */ + thd_set_ha_data(thd, hton, 0); + } + DBUG_ASSERT(!thd->ha_data[i].ha_ptr); + } } static my_bool kill_handlerton(THD *thd, plugin_ref plugin, @@ -860,8 +914,7 @@ static my_bool kill_handlerton(THD *thd, plugin_ref plugin, handlerton *hton= plugin_hton(plugin); mysql_mutex_assert_owner(&thd->LOCK_thd_data); - if (hton->state == SHOW_OPTION_YES && hton->kill_query && - thd_get_ha_data(thd, hton)) + if (hton->kill_query && thd_get_ha_data(thd, hton)) hton->kill_query(hton, thd, *(enum thd_kill_levels *) level); return FALSE; } @@ -882,7 +935,7 @@ static my_bool plugin_prepare_for_backup(THD *unused1, plugin_ref plugin, void *not_used) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->prepare_for_backup) + if (hton->prepare_for_backup) hton->prepare_for_backup(); return FALSE; } @@ -898,7 +951,7 @@ static my_bool plugin_end_backup(THD *unused1, plugin_ref plugin, void *not_used) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->end_backup) + if (hton->end_backup) hton->end_backup(); return FALSE; } @@ -911,6 +964,33 @@ void ha_end_backup() } +/* + Inform plugin of the server shutdown. + Called after all connections are down. + + Under some circumstances, storage engine might need to + so some work, before deinit() can be safely called. + (an example is Innodb purge that might call into server + to calculate virtual columns, which might potentially also + invoke other plugins, such as audit +*/ +static my_bool plugin_pre_shutdown(THD *, plugin_ref plugin, void *) +{ + handlerton *hton= plugin_hton(plugin); + if (hton->pre_shutdown) + hton->pre_shutdown(); + return FALSE; +} + + +void ha_pre_shutdown() +{ + plugin_foreach_with_mask(0, plugin_pre_shutdown, + MYSQL_STORAGE_ENGINE_PLUGIN, + PLUGIN_IS_DELETED | PLUGIN_IS_READY, 0); +} + + /* ======================================================================== ======================= TRANSACTIONS ===================================*/ @@ -1217,7 +1297,7 @@ void ha_end_backup() times per transaction. */ -void trans_register_ha(THD *thd, bool all, handlerton *ht_arg) +void trans_register_ha(THD *thd, bool all, handlerton *ht_arg, ulonglong trxid) { THD_TRANS *trans; Ha_trx_info *ha_info; @@ -1226,14 +1306,14 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg) if (all) { - trans= &thd->transaction.all; + trans= &thd->transaction->all; thd->server_status|= SERVER_STATUS_IN_TRANS; if (thd->tx_read_only) thd->server_status|= SERVER_STATUS_IN_TRANS_READONLY; DBUG_PRINT("info", ("setting SERVER_STATUS_IN_TRANS")); } else - trans= &thd->transaction.stmt; + trans= &thd->transaction->stmt; ha_info= thd->ha_data[ht_arg->slot].ha_info + (all ? 1 : 0); @@ -1245,9 +1325,28 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg) trans->no_2pc|=(ht_arg->prepare==0); /* Set implicit xid even if there's explicit XA, it will be ignored anyway. */ - if (thd->transaction.implicit_xid.is_null()) - thd->transaction.implicit_xid.set(thd->query_id); + if (thd->transaction->implicit_xid.is_null()) + thd->transaction->implicit_xid.set(thd->query_id); + +/* + Register transaction start in performance schema if not done already. + By doing this, we handle cases when the transaction is started implicitly in + autocommit=0 mode, and cases when we are in normal autocommit=1 mode and the + executed statement is a single-statement transaction. + Explicitly started transactions are handled in trans_begin(). + + Do not register transactions in which binary log is the only participating + transactional storage engine. +*/ + if (thd->m_transaction_psi == NULL && ht_arg->db_type != DB_TYPE_BINLOG) + { + thd->m_transaction_psi= MYSQL_START_TRANSACTION(&thd->m_transaction_state, + thd->get_xid(), trxid, thd->tx_isolation, thd->tx_read_only, + !thd->in_multi_stmt_transaction_mode()); + DEBUG_SYNC(thd, "after_set_transaction_psi_before_set_transaction_gtid"); + //gtid_set_performance_schema_values(thd); + } DBUG_VOID_RETURN; } @@ -1290,7 +1389,7 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all) int ha_prepare(THD *thd) { int error=0, all=1; - THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; + THD_TRANS *trans=all ? &thd->transaction->all : &thd->transaction->stmt; Ha_trx_info *ha_info= trans->ha_list; DBUG_ENTER("ha_prepare"); @@ -1317,11 +1416,42 @@ int ha_prepare(THD *thd) } } + + DEBUG_SYNC(thd, "at_unlog_xa_prepare"); + + if (tc_log->unlog_xa_prepare(thd, all)) + { + ha_rollback_trans(thd, all); + error=1; + } } DBUG_RETURN(error); } +/* + Like ha_check_and_coalesce_trx_read_only to return counted number of + read-write transaction participants limited to two, but works in the 'all' + context. + Also returns the last found rw ha_info through the 2nd argument. +*/ +uint ha_count_rw_all(THD *thd, Ha_trx_info **ptr_ha_info) +{ + unsigned rw_ha_count= 0; + + for (auto ha_info= thd->transaction->all.ha_list; ha_info; + ha_info= ha_info->next()) + { + if (ha_info->is_trx_read_write()) + { + *ptr_ha_info= ha_info; + if (++rw_ha_count > 1) + break; + } + } + return rw_ha_count; +} + /** Check if we can skip the two-phase commit. @@ -1404,7 +1534,7 @@ int ha_commit_trans(THD *thd, bool all) 'all' means that this is either an explicit commit issued by user, or an implicit commit issued by a DDL. */ - THD_TRANS *trans= all ? &thd->transaction.all : &thd->transaction.stmt; + THD_TRANS *trans= all ? &thd->transaction->all : &thd->transaction->stmt; /* "real" is a nick name for a transaction for which a commit will make persistent changes. E.g. a 'stmt' transaction inside an 'all' @@ -1412,7 +1542,7 @@ int ha_commit_trans(THD *thd, bool all) the changes are not durable as they might be rolled back if the enclosing 'all' transaction is rolled back. */ - bool is_real_trans= ((all || thd->transaction.all.ha_list == 0) && + bool is_real_trans= ((all || thd->transaction->all.ha_list == 0) && !(thd->variables.option_bits & OPTION_GTID_BEGIN)); Ha_trx_info *ha_info= trans->ha_list; bool need_prepare_ordered, need_commit_ordered; @@ -1439,8 +1569,8 @@ int ha_commit_trans(THD *thd, bool all) flags will not get propagated to its normal transaction's counterpart. */ - DBUG_ASSERT(thd->transaction.stmt.ha_list == NULL || - trans == &thd->transaction.stmt); + DBUG_ASSERT(thd->transaction->stmt.ha_list == NULL || + trans == &thd->transaction->stmt); if (thd->in_sub_stmt) { @@ -1469,16 +1599,17 @@ int ha_commit_trans(THD *thd, bool all) Free resources and perform other cleanup even for 'empty' transactions. */ if (is_real_trans) - thd->transaction.cleanup(); + { + thd->transaction->cleanup(); + MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi); + thd->m_transaction_psi= NULL; + } #ifdef WITH_WSREP if (wsrep_is_active(thd) && is_real_trans && !error) - { wsrep_commit_empty(thd, all); - } #endif /* WITH_WSREP */ - ha_maria_implicit_commit(thd, TRUE); - DBUG_RETURN(error); + DBUG_RETURN(0); } DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE();); @@ -1505,29 +1636,26 @@ int ha_commit_trans(THD *thd, bool all) We allow the owner of FTWRL to COMMIT; we assume that it knows what it does. */ - mdl_backup.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT); + MDL_REQUEST_INIT(&mdl_backup, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_EXPLICIT); if (!WSREP(thd)) { if (thd->mdl_context.acquire_lock(&mdl_backup, thd->variables.lock_wait_timeout)) { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1); ha_rollback_trans(thd, all); DBUG_RETURN(1); } thd->backup_commit_lock= &mdl_backup; } DEBUG_SYNC(thd, "ha_commit_trans_after_acquire_commit_lock"); - - /* Use shortcut as we already have the MDL_BACKUP_COMMIT lock */ - ha_maria::implicit_commit(thd, TRUE); } - else - ha_maria_implicit_commit(thd, TRUE); if (rw_trans && opt_readonly && - !(thd->security_ctx->master_access & SUPER_ACL) && + !(thd->security_ctx->master_access & PRIV_IGNORE_READ_ONLY) && !thd->slave_thread) { my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); @@ -1575,7 +1703,7 @@ int ha_commit_trans(THD *thd, bool all) // Here, the call will not commit inside InnoDB. It is only working // around closing thd->transaction.stmt open by TR_table::open(). if (all) - commit_one_phase_2(thd, false, &thd->transaction.stmt, false); + commit_one_phase_2(thd, false, &thd->transaction->stmt, false); #ifdef WITH_WSREP thd->variables.wsrep_on= saved_wsrep_on; #endif @@ -1600,18 +1728,15 @@ int ha_commit_trans(THD *thd, bool all) #endif /* WITH_WSREP */ error= ha_commit_one_phase(thd, all); #ifdef WITH_WSREP - if (run_wsrep_hooks) - error= error || wsrep_after_commit(thd, all); + // Here in case of error we must return 2 for inconsistency + if (run_wsrep_hooks && !error) + error= wsrep_after_commit(thd, all) ? 2 : 0; #endif /* WITH_WSREP */ goto done; } need_prepare_ordered= FALSE; need_commit_ordered= FALSE; - DBUG_ASSERT(thd->transaction.implicit_xid.get_my_xid() == - thd->transaction.implicit_xid.quick_get_my_xid()); - xid= thd->transaction.xid_state.is_explicit_XA() ? 0 : - thd->transaction.implicit_xid.quick_get_my_xid(); for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) { @@ -1636,6 +1761,18 @@ int ha_commit_trans(THD *thd, bool all) DEBUG_SYNC(thd, "ha_commit_trans_after_prepare"); DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); + if (!is_real_trans) + { + error= commit_one_phase_2(thd, all, trans, is_real_trans); + goto done; + } + + DBUG_ASSERT(thd->transaction->implicit_xid.get_my_xid() == + thd->transaction->implicit_xid.quick_get_my_xid()); + DBUG_ASSERT(!thd->transaction->xid_state.is_explicit_XA() || + thd->lex->xa_opt == XA_ONE_PHASE); + xid= thd->transaction->implicit_xid.quick_get_my_xid(); + #ifdef WITH_WSREP if (run_wsrep_hooks && !error) { @@ -1646,14 +1783,6 @@ int ha_commit_trans(THD *thd, bool all) xid= s.get(); } } -#endif /* WITH_WSREP */ - - if (!is_real_trans) - { - error= commit_one_phase_2(thd, all, trans, is_real_trans); - goto done; - } -#ifdef WITH_WSREP if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all))) goto wsrep_err; #endif /* WITH_WSREP */ @@ -1670,8 +1799,10 @@ int ha_commit_trans(THD *thd, bool all) error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; #ifdef WITH_WSREP - if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all)))) + if (run_wsrep_hooks && + (error || (error = wsrep_after_commit(thd, all)))) { + error = 2; mysql_mutex_lock(&thd->LOCK_thd_data); if (wsrep_must_abort(thd)) { @@ -1684,12 +1815,15 @@ int ha_commit_trans(THD *thd, bool all) #endif /* WITH_WSREP */ DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); if (tc_log->unlog(cookie, xid)) - { error= 2; /* Error during commit */ - goto end; - } done: + if (is_real_trans) + { + MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi); + thd->m_transaction_psi= NULL; + } + DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE();); mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); @@ -1727,6 +1861,12 @@ err: ha_rollback_trans(thd, all); else { + /* + We are not really doing a rollback here, but the code in trans_commit() + requres that m_transaction_psi is 0 when we return from this function. + */ + MYSQL_ROLLBACK_TRANSACTION(thd->m_transaction_psi); + thd->m_transaction_psi= NULL; WSREP_DEBUG("rollback skipped %p %d",thd->rgi_slave, thd->rgi_slave->is_parallel_exec); } @@ -1768,7 +1908,7 @@ end: int ha_commit_one_phase(THD *thd, bool all) { - THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; + THD_TRANS *trans=all ? &thd->transaction->all : &thd->transaction->stmt; /* "real" is a nick name for a transaction for which a commit will make persistent changes. E.g. a 'stmt' transaction inside a 'all' @@ -1782,7 +1922,7 @@ int ha_commit_one_phase(THD *thd, bool all) ha_commit_one_phase() can be called with an empty transaction.all.ha_list, see why in trans_register_ha()). */ - bool is_real_trans= ((all || thd->transaction.all.ha_list == 0) && + bool is_real_trans= ((all || thd->transaction->all.ha_list == 0) && !(thd->variables.option_bits & OPTION_GTID_BEGIN)); int res; DBUG_ENTER("ha_commit_one_phase"); @@ -1830,8 +1970,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) if (all) { #ifdef HAVE_QUERY_CACHE - if (thd->transaction.changed_tables) - query_cache.invalidate(thd, thd->transaction.changed_tables); + if (thd->transaction->changed_tables) + query_cache.invalidate(thd, thd->transaction->changed_tables); #endif } } @@ -1840,7 +1980,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) if (is_real_trans) { thd->has_waiter= false; - thd->transaction.cleanup(); + thd->transaction->cleanup(); if (count >= 2) statistic_increment(transactions_multi_engine, LOCK_status); } @@ -1852,7 +1992,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) int ha_rollback_trans(THD *thd, bool all) { int error=0; - THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; + THD_TRANS *trans=all ? &thd->transaction->all : &thd->transaction->stmt; Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; /* "real" is a nick name for a transaction for which a commit will @@ -1867,15 +2007,15 @@ int ha_rollback_trans(THD *thd, bool all) ha_commit_one_phase() is called with an empty transaction.all.ha_list, see why in trans_register_ha()). */ - bool is_real_trans=all || thd->transaction.all.ha_list == 0; + bool is_real_trans=all || thd->transaction->all.ha_list == 0; DBUG_ENTER("ha_rollback_trans"); /* We must not rollback the normal transaction if a statement transaction is pending. */ - DBUG_ASSERT(thd->transaction.stmt.ha_list == NULL || - trans == &thd->transaction.stmt); + DBUG_ASSERT(thd->transaction->stmt.ha_list == NULL || + trans == &thd->transaction->stmt); #ifdef HAVE_REPLICATION if (is_real_trans) @@ -1891,7 +2031,8 @@ int ha_rollback_trans(THD *thd, bool all) rollback without signalling following transactions. And in release builds, we explicitly do the signalling before rolling back. */ - DBUG_ASSERT(!(thd->rgi_slave && thd->rgi_slave->did_mark_start_commit)); + DBUG_ASSERT(!(thd->rgi_slave && thd->rgi_slave->did_mark_start_commit) || + thd->transaction->xid_state.is_explicit_XA()); if (thd->rgi_slave && thd->rgi_slave->did_mark_start_commit) thd->rgi_slave->unmark_start_commit(); } @@ -1925,7 +2066,8 @@ int ha_rollback_trans(THD *thd, bool all) int err; handlerton *ht= ha_info->ht(); if ((err= ht->rollback(ht, thd, all))) - { // cannot happen + { + // cannot happen my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); error=1; #ifdef WITH_WSREP @@ -1951,6 +2093,13 @@ int ha_rollback_trans(THD *thd, bool all) } (void) wsrep_after_rollback(thd, all); #endif /* WITH_WSREP */ + + if (all || !thd->in_active_multi_stmt_transaction()) + { + MYSQL_ROLLBACK_TRANSACTION(thd->m_transaction_psi); + thd->m_transaction_psi= NULL; + } + /* Always cleanup. Even if nht==0. There may be savepoints. */ if (is_real_trans) { @@ -1959,10 +2108,10 @@ int ha_rollback_trans(THD *thd, bool all) transaction hasn't been started in any transactional storage engine. */ if (thd->transaction_rollback_request) - thd->transaction.xid_state.set_error(thd->get_stmt_da()->sql_errno()); + thd->transaction->xid_state.set_error(thd->get_stmt_da()->sql_errno()); thd->has_waiter= false; - thd->transaction.cleanup(); + thd->transaction->cleanup(); } if (all) thd->transaction_rollback_request= FALSE; @@ -1980,7 +2129,7 @@ int ha_rollback_trans(THD *thd, bool all) it doesn't matter if a warning is pushed to a system thread or not: No one will see it... */ - if (is_real_trans && thd->transaction.all.modified_non_trans_table && + if (is_real_trans && thd->transaction->all.modified_non_trans_table && !thd->slave_thread && thd->killed < KILL_CONNECTION) push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARNING_NOT_COMPLETE_ROLLBACK, @@ -2001,7 +2150,7 @@ static my_bool xacommit_handlerton(THD *unused1, plugin_ref plugin, void *arg) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->recover) + if (hton->recover) { hton->commit_by_xid(hton, ((struct xahton_st *)arg)->xid); ((struct xahton_st *)arg)->result= 0; @@ -2013,7 +2162,7 @@ static my_bool xarollback_handlerton(THD *unused1, plugin_ref plugin, void *arg) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->recover) + if (hton->recover) { hton->rollback_by_xid(hton, ((struct xahton_st *)arg)->xid); ((struct xahton_st *)arg)->result= 0; @@ -2044,7 +2193,7 @@ int ha_commit_or_rollback_by_xid(XID *xid, bool commit) @return pointer to converted string @note This does not need to be multi-byte safe or anything */ -char *xid_to_str(char *buf, const XID &xid) +static char *xid_to_str(char *buf, const XID &xid) { int i; char *s=buf; @@ -2099,7 +2248,7 @@ static my_xid wsrep_order_and_check_continuity(XID *list, int len) { #ifdef WITH_WSREP wsrep_sort_xid_array(list, len); - wsrep::gtid cur_position= wsrep_get_SE_checkpoint(); + wsrep::gtid cur_position= wsrep_get_SE_checkpoint<wsrep::gtid>(); long long cur_seqno= cur_position.seqno().get(); for (int i= 0; i < len; ++i) { @@ -2150,7 +2299,7 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, struct xarecover_st *info= (struct xarecover_st *) arg; int got; - if (hton->state == SHOW_OPTION_YES && hton->recover) + if (hton->recover) { while ((got= hton->recover(hton, info->list, info->len)) > 0 ) { @@ -2265,7 +2414,7 @@ int ha_recover(HASH *commit_list) info.list==0 && info.len > MIN_XID_LIST_SIZE; info.len/=2) { DBUG_EXECUTE_IF("min_xa_len", info.len = 16;); - info.list=(XID *)my_malloc(info.len*sizeof(XID), MYF(0)); + info.list=(XID *)my_malloc(key_memory_XID, info.len*sizeof(XID), MYF(0)); } if (!info.list) { @@ -2302,8 +2451,7 @@ int ha_recover(HASH *commit_list) Called by engine to notify TC that a new commit checkpoint has been reached. See comments on handlerton method commit_checkpoint_request() for details. */ -void -commit_checkpoint_notify_ha(handlerton *hton, void *cookie) +void commit_checkpoint_notify_ha(void *cookie) { tc_log->commit_checkpoint_notify(cookie); } @@ -2322,8 +2470,8 @@ commit_checkpoint_notify_ha(handlerton *hton, void *cookie) bool ha_rollback_to_savepoint_can_release_mdl(THD *thd) { Ha_trx_info *ha_info; - THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt : - &thd->transaction.all); + THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction->stmt : + &thd->transaction->all); DBUG_ENTER("ha_rollback_to_savepoint_can_release_mdl"); @@ -2347,8 +2495,8 @@ bool ha_rollback_to_savepoint_can_release_mdl(THD *thd) int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv) { int error=0; - THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt : - &thd->transaction.all); + THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction->stmt : + &thd->transaction->all); Ha_trx_info *ha_info, *ha_info_next; DBUG_ENTER("ha_rollback_to_savepoint"); @@ -2407,6 +2555,10 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv) ha_info->reset(); /* keep it conveniently zero-filled */ } trans->ha_list= sv->ha_list; + + if (thd->m_transaction_psi != NULL) + MYSQL_INC_TRANSACTION_ROLLBACK_TO_SAVEPOINT(thd->m_transaction_psi, 1); + DBUG_RETURN(error); } @@ -2429,8 +2581,8 @@ int ha_savepoint(THD *thd, SAVEPOINT *sv) } #endif /* WITH_WSREP */ int error=0; - THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt : - &thd->transaction.all); + THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction->stmt : + &thd->transaction->all); Ha_trx_info *ha_info= trans->ha_list; DBUG_ENTER("ha_savepoint"); @@ -2458,6 +2610,9 @@ int ha_savepoint(THD *thd, SAVEPOINT *sv) */ sv->ha_list= trans->ha_list; + if (!error && thd->m_transaction_psi != NULL) + MYSQL_INC_TRANSACTION_SAVEPOINTS(thd->m_transaction_psi, 1); + DBUG_RETURN(error); } @@ -2482,6 +2637,10 @@ int ha_release_savepoint(THD *thd, SAVEPOINT *sv) error=1; } } + + if (thd->m_transaction_psi != NULL) + MYSQL_INC_TRANSACTION_RELEASE_SAVEPOINT(thd->m_transaction_psi, 1); + DBUG_RETURN(error); } @@ -2490,8 +2649,7 @@ static my_bool snapshot_handlerton(THD *thd, plugin_ref plugin, void *arg) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && - hton->start_consistent_snapshot) + if (hton->start_consistent_snapshot) { if (hton->start_consistent_snapshot(hton, thd)) return TRUE; @@ -2537,28 +2695,14 @@ static my_bool flush_handlerton(THD *thd, plugin_ref plugin, void *arg) { handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->flush_logs && - hton->flush_logs(hton)) - return TRUE; - return FALSE; + return hton->flush_logs && hton->flush_logs(hton); } -bool ha_flush_logs(handlerton *db_type) +bool ha_flush_logs() { - if (db_type == NULL) - { - if (plugin_foreach(NULL, flush_handlerton, - MYSQL_STORAGE_ENGINE_PLUGIN, 0)) - return TRUE; - } - else - { - if (db_type->state != SHOW_OPTION_YES || - (db_type->flush_logs && db_type->flush_logs(db_type))) - return TRUE; - } - return FALSE; + return plugin_foreach(NULL, flush_handlerton, + MYSQL_STORAGE_ENGINE_PLUGIN, 0); } @@ -2610,57 +2754,72 @@ const char *get_canonical_filename(handler *file, const char *path, } -/** delete a table in the engine +/** + Delete a table in the engine + + @return 0 Table was deleted + @return -1 Table didn't exists, no error given + @return # Error from table handler @note ENOENT and HA_ERR_NO_SUCH_TABLE are not considered errors. - The .frm file will be deleted only if we return 0. + The .frm file should be deleted by the caller only if we return <= 0. */ -int ha_delete_table(THD *thd, handlerton *table_type, const char *path, - const LEX_CSTRING *db, const LEX_CSTRING *alias, bool generate_warning) + +int ha_delete_table(THD *thd, handlerton *hton, const char *path, + const LEX_CSTRING *db, const LEX_CSTRING *alias, + bool generate_warning) { - handler *file; - char tmp_path[FN_REFLEN]; int error; - TABLE dummy_table; - TABLE_SHARE dummy_share; + bool is_error= thd->is_error(); DBUG_ENTER("ha_delete_table"); - /* table_type is NULL in ALTER TABLE when renaming only .frm files */ - if (table_type == NULL || table_type == view_pseudo_hton || - ! (file=get_new_handler((TABLE_SHARE*)0, thd->mem_root, table_type))) + /* hton is NULL in ALTER TABLE when renaming only .frm files */ + if (hton == NULL || hton == view_pseudo_hton) DBUG_RETURN(0); - bzero((char*) &dummy_table, sizeof(dummy_table)); - bzero((char*) &dummy_share, sizeof(dummy_share)); - dummy_table.s= &dummy_share; + if (ha_check_if_updates_are_ignored(thd, hton, "DROP")) + DBUG_RETURN(0); - path= get_canonical_filename(file, path, tmp_path); - if (unlikely((error= file->ha_delete_table(path)))) + error= hton->drop_table(hton, path); + if (error > 0) { /* - it's not an error if the table doesn't exist in the engine. + It's not an error if the table doesn't exist in the engine. warn the user, but still report DROP being a success */ - bool intercept= error == ENOENT || error == HA_ERR_NO_SUCH_TABLE; + bool intercept= non_existing_table_error(error); - if (!intercept || generate_warning) + if ((!intercept || generate_warning) && ! thd->is_error()) { - /* Fill up strucutures that print_error may need */ - dummy_share.path.str= (char*) path; - dummy_share.path.length= strlen(path); - dummy_share.normalized_path= dummy_share.path; - dummy_share.db= *db; - dummy_share.table_name= *alias; - dummy_table.alias.set(alias->str, alias->length, table_alias_charset); - file->change_table_ptr(&dummy_table, &dummy_share); - file->print_error(error, MYF(intercept ? ME_WARNING : 0)); + TABLE dummy_table; + TABLE_SHARE dummy_share; + handler *file= get_new_handler(nullptr, thd->mem_root, hton); + if (file) { + bzero((char*) &dummy_table, sizeof(dummy_table)); + bzero((char*) &dummy_share, sizeof(dummy_share)); + dummy_share.path.str= (char*) path; + dummy_share.path.length= strlen(path); + dummy_share.normalized_path= dummy_share.path; + dummy_share.db= *db; + dummy_share.table_name= *alias; + dummy_table.s= &dummy_share; + dummy_table.alias.set(alias->str, alias->length, table_alias_charset); + file->change_table_ptr(&dummy_table, &dummy_share); + file->print_error(error, MYF(intercept ? ME_WARNING : 0)); + delete file; + } } if (intercept) - error= 0; + { + /* Clear error if we got it in this function */ + if (!is_error) + thd->clear_error(); + error= -1; + } } - delete file; - + if (error) + DBUG_PRINT("exit", ("error: %d", error)); DBUG_RETURN(error); } @@ -2706,6 +2865,24 @@ err: return NULL; } + +/** + clone of current handler. + + Creates a clone of handler used for unique hash key and WITHOUT OVERLAPS. + @return error code +*/ +int handler::create_lookup_handler() +{ + handler *tmp; + if (lookup_handler != this) + return 0; + if (!(tmp= clone(table->s->normalized_path.str, table->in_use->mem_root))) + return 1; + lookup_handler= tmp; + return lookup_handler->ha_external_lock(table->in_use, F_RDLCK); +} + LEX_CSTRING *handler::engine_name() { return hton_name(ht); @@ -2724,22 +2901,18 @@ double handler::keyread_time(uint index, uint ranges, ha_rows rows) { DBUG_ASSERT(ranges == 0 || ranges == 1); size_t len= table->key_info[index].key_length + ref_length; - if (index == table->s->primary_key && table->file->primary_key_is_clustered()) + if (table->file->is_clustering_key(index)) len= table->s->stored_rec_length; double cost= (double)rows*len/(stats.block_size+1)*IDX_BLOCK_COPY_COST; if (ranges) { - uint keys_per_block= (uint) (stats.block_size/2.0/len+1); - ulonglong blocks= !rows ? 0 : (rows-1) / keys_per_block + 1; + uint keys_per_block= (uint) (stats.block_size*3/4/len+1); + ulonglong blocks= (rows+ keys_per_block- 1)/keys_per_block; cost+= blocks; } return cost; } -void **handler::ha_data(THD *thd) const -{ - return thd_ha_data(thd, ht); -} THD *handler::ha_thd(void) const { @@ -2766,6 +2939,30 @@ void handler::rebind_psi() } +void handler::start_psi_batch_mode() +{ +#ifdef HAVE_PSI_TABLE_INTERFACE + DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_NONE); + DBUG_ASSERT(m_psi_locker == NULL); + m_psi_batch_mode= PSI_BATCH_MODE_STARTING; + m_psi_numrows= 0; +#endif +} + +void handler::end_psi_batch_mode() +{ +#ifdef HAVE_PSI_TABLE_INTERFACE + DBUG_ASSERT(m_psi_batch_mode != PSI_BATCH_MODE_NONE); + if (m_psi_locker != NULL) + { + DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_STARTED); + PSI_TABLE_CALL(end_table_io_wait)(m_psi_locker, m_psi_numrows); + m_psi_locker= NULL; + } + m_psi_batch_mode= PSI_BATCH_MODE_NONE; +#endif +} + PSI_table_share *handler::ha_table_share_psi() const { return table_share->m_psi; @@ -2842,7 +3039,6 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode, } reset_statistics(); internal_tmp_table= MY_TEST(test_if_locked & HA_OPEN_INTERNAL_TABLE); - DBUG_RETURN(error); } @@ -2855,11 +3051,15 @@ int handler::ha_close(void) */ if (table->in_use) status_var_add(table->in_use->status_var.rows_tmp_read, rows_tmp_read); - PSI_CALL_close_table(m_psi); + PSI_CALL_close_table(table_share, m_psi); m_psi= NULL; /* instrumentation handle, invalid after close_table() */ + DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_NONE); + DBUG_ASSERT(m_psi_locker == NULL); /* Detach from ANALYZE tracker */ tracker= NULL; + /* We use ref as way to check that open succeded */ + ref= 0; DBUG_ASSERT(m_lock_type == F_UNLCK); DBUG_ASSERT(inited == NONE); @@ -2877,7 +3077,7 @@ int handler::ha_rnd_next(uchar *buf) do { - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, MAX_KEY, result, { result= rnd_next(buf); }) if (result != HA_ERR_RECORD_DELETED) break; @@ -2909,7 +3109,7 @@ int handler::ha_rnd_pos(uchar *buf, uchar *pos) m_lock_type != F_UNLCK); DBUG_ASSERT(inited == RND); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, MAX_KEY, result, { result= rnd_pos(buf, pos); }) increment_statistics(&SSV::ha_read_rnd_count); if (result == HA_ERR_RECORD_DELETED) @@ -2934,7 +3134,7 @@ int handler::ha_index_read_map(uchar *buf, const uchar *key, m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_read_map(buf, key, keypart_map, find_flag); }) increment_statistics(&SSV::ha_read_key_count); if (!result) @@ -2962,7 +3162,7 @@ int handler::ha_index_read_idx_map(uchar *buf, uint index, const uchar *key, DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type != F_UNLCK); DBUG_ASSERT(end_range == NULL); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, index, result, { result= index_read_idx_map(buf, index, key, keypart_map, find_flag); }) increment_statistics(&SSV::ha_read_key_count); if (!result) @@ -2984,7 +3184,7 @@ int handler::ha_index_next(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_next(buf); }) increment_statistics(&SSV::ha_read_next_count); if (!result) @@ -2994,6 +3194,9 @@ int handler::ha_index_next(uchar * buf) table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ); } table->status=result ? STATUS_NOT_FOUND: 0; + + DEBUG_SYNC(ha_thd(), "handler_ha_index_next_end"); + DBUG_RETURN(result); } @@ -3005,7 +3208,7 @@ int handler::ha_index_prev(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_prev(buf); }) increment_statistics(&SSV::ha_read_prev_count); if (!result) @@ -3025,7 +3228,7 @@ int handler::ha_index_first(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_first(buf); }) increment_statistics(&SSV::ha_read_first_count); if (!result) @@ -3045,7 +3248,7 @@ int handler::ha_index_last(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_last(buf); }) increment_statistics(&SSV::ha_read_last_count); if (!result) @@ -3065,7 +3268,7 @@ int handler::ha_index_next_same(uchar *buf, const uchar *key, uint keylen) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_next_same(buf, key, keylen); }) increment_statistics(&SSV::ha_read_next_count); if (!result) @@ -3516,8 +3719,9 @@ int handler::update_auto_increment() variables->auto_increment_increment); auto_inc_intervals_count++; /* Row-based replication does not need to store intervals in binlog */ - if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open()) - && !thd->is_current_stmt_binlog_format_row()) + if (((WSREP_NNULL(thd) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open()) && + !thd->is_current_stmt_binlog_format_row()) thd->auto_inc_intervals_in_cur_stmt_for_binlog. append(auto_inc_interval_for_cur_row.minimum(), auto_inc_interval_for_cur_row.values(), @@ -4013,6 +4217,9 @@ void handler::print_error(int error, myf errflag) case HA_ERR_TABLE_IN_FK_CHECK: textno= ER_TABLE_IN_FK_CHECK; break; + case HA_ERR_COMMIT_ERROR: + textno= ER_ERROR_DURING_COMMIT; + break; default: { /* The error was "unknown" to this function. @@ -4119,7 +4326,7 @@ int handler::check_collation_compatibility() cs_number == 23 || /* cp1251_ukrainian_ci - bug #29461 */ cs_number == 26)) || /* cp1250_general_ci - bug #29461 */ (mysql_version < 50124 && - (cs_number == 33 || /* utf8_general_ci - bug #27877 */ + (cs_number == 33 || /* utf8mb3_general_ci - bug #27877 */ cs_number == 35))) /* ucs2_general_ci - bug #27877 */ return HA_ADMIN_NEEDS_UPGRADE; } @@ -4242,15 +4449,17 @@ uint handler::get_dup_key(int error) { DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type != F_UNLCK); DBUG_ENTER("handler::get_dup_key"); - if (table->s->long_unique_table && table->file->errkey < table->s->keys) - DBUG_RETURN(table->file->errkey); - table->file->errkey = (uint) -1; + + if (lookup_errkey != (uint)-1) + DBUG_RETURN(errkey= lookup_errkey); + + errkey= (uint)-1; if (error == HA_ERR_FOUND_DUPP_KEY || error == HA_ERR_FOREIGN_DUPLICATE_KEY || error == HA_ERR_FOUND_DUPP_UNIQUE || error == HA_ERR_NULL_IN_SPATIAL || error == HA_ERR_DROP_INDEX_FK) - table->file->info(HA_STATUS_ERRKEY | HA_STATUS_NO_LOCK); - DBUG_RETURN(table->file->errkey); + info(HA_STATUS_ERRKEY | HA_STATUS_NO_LOCK); + DBUG_RETURN(errkey); } @@ -4261,45 +4470,51 @@ uint handler::get_dup_key(int error) @note We assume that the handler may return more extensions than - was actually used for the file. + was actually used for the file. We also assume that the first + extension is the most important one (see the comment near + handlerton::tablefile_extensions). If this exist and we can't delete + that it, we will abort the delete. + If the first one doesn't exists, we have to try to delete all other + extension as there is chance that the server had crashed between + the delete of the first file and the next @retval 0 If we successfully deleted at least one file from base_ext and - didn't get any other errors than ENOENT + didn't get any other errors than ENOENT + @retval !0 Error */ + int handler::delete_table(const char *name) { - int saved_error= 0; - int error= 0; - int enoent_or_zero; + int saved_error= ENOENT; + bool abort_if_first_file_error= 1; + bool some_file_deleted= 0; + DBUG_ENTER("handler::delete_table"); - if (ht->discover_table) - enoent_or_zero= 0; // the table may not exist in the engine, it's ok - else - enoent_or_zero= ENOENT; // the first file of bas_ext() *must* exist - - for (const char **ext=bas_ext(); *ext ; ext++) + for (const char **ext= bas_ext(); *ext ; ext++) { - if (mysql_file_delete_with_symlink(key_file_misc, name, *ext, 0)) + int err= mysql_file_delete_with_symlink(key_file_misc, name, *ext, MYF(0)); + if (err) { if (my_errno != ENOENT) { + saved_error= my_errno; /* - If error on the first existing file, return the error. + If error other than file not found on the first existing file, + return the error. Otherwise delete as much as possible. */ - if (enoent_or_zero) - return my_errno; - saved_error= my_errno; + if (abort_if_first_file_error) + DBUG_RETURN(saved_error); } } else - enoent_or_zero= 0; // No error for ENOENT - error= enoent_or_zero; + some_file_deleted= 1; + abort_if_first_file_error= 0; } - return saved_error ? saved_error : error; + DBUG_RETURN(some_file_deleted && saved_error == ENOENT ? 0 : saved_error); } @@ -4335,6 +4550,23 @@ void handler::drop_table(const char *name) /** + Return true if the error from drop table means that the + table didn't exists +*/ + +bool non_existing_table_error(int error) +{ + return (error == ENOENT || + (error == EE_DELETE && my_errno == ENOENT) || + error == HA_ERR_NO_SUCH_TABLE || + error == HA_ERR_UNSUPPORTED || + error == ER_NO_SUCH_TABLE || + error == ER_NO_SUCH_TABLE_IN_ENGINE || + error == ER_WRONG_OBJECT); +} + + +/** Performs checks upon the table. @param thd thread doing CHECK TABLE operation @@ -4349,6 +4581,7 @@ void handler::drop_table(const char *name) @retval HA_ADMIN_NOT_IMPLEMENTED */ + int handler::ha_check(THD *thd, HA_CHECK_OPT *check_opt) { int error; @@ -4395,10 +4628,9 @@ void handler::mark_trx_read_write_internal() */ if (ha_info->is_started()) { - DBUG_ASSERT(has_transaction_manager()); /* - table_share can be NULL in ha_delete_table(). See implementation - of standalone function ha_delete_table() in sql_base.cc. + table_share can be NULL, for example, in ha_delete_table() or + ha_rename_table(). */ if (table_share == NULL || table_share->tmp_table == NO_TMP_TABLE) ha_info->set_trx_read_write(); @@ -4670,7 +4902,8 @@ handler::check_if_supported_inplace_alter(TABLE *altered_table, ALTER_DROP_CHECK_CONSTRAINT | ALTER_PARTITIONED | ALTER_VIRTUAL_GCOL_EXPR | - ALTER_RENAME; + ALTER_RENAME | + ALTER_RENAME_INDEX; /* Is there at least one operation that requires copy algorithm? */ if (ha_alter_info->handler_flags & ~inplace_offline_operations) @@ -4758,35 +4991,108 @@ handler::ha_rename_table(const char *from, const char *to) /** - Delete table: public interface. + Drop table in the engine: public interface. + + @sa handler::drop_table() - @sa handler::delete_table() + The difference between this and delete_table() is that the table is open in + drop_table(). */ -int -handler::ha_delete_table(const char *name) +void +handler::ha_drop_table(const char *name) { + DBUG_ASSERT(m_lock_type == F_UNLCK); + if (check_if_updates_are_ignored("DROP")) + return; + mark_trx_read_write(); - return delete_table(name); + drop_table(name); } /** - Drop table in the engine: public interface. + Structure used during force drop table. +*/ - @sa handler::drop_table() +struct st_force_drop_table_params +{ + const char *path; + const LEX_CSTRING *db; + const LEX_CSTRING *alias; + int error; + bool discovering; +}; - The difference between this and delete_table() is that the table is open in - drop_table(). + +/** + Try to delete table from a given plugin + Table types with discovery is ignored as these .frm files would have + been created during discovery and thus doesn't need to be found + for drop table force */ -void -handler::ha_drop_table(const char *name) +static my_bool delete_table_force(THD *thd, plugin_ref plugin, void *arg) { - DBUG_ASSERT(m_lock_type == F_UNLCK); - mark_trx_read_write(); + handlerton *hton = plugin_hton(plugin); + st_force_drop_table_params *param = (st_force_drop_table_params *)arg; - return drop_table(name); + if (param->discovering == (hton->discover_table != NULL) && + !(thd->slave_thread && (hton->flags & HTON_IGNORE_UPDATES))) + { + int error; + error= ha_delete_table(thd, hton, param->path, param->db, param->alias, 0); + if (error > 0 && !non_existing_table_error(error)) + param->error= error; + if (error == 0) + { + if (hton && hton->flags & HTON_TABLE_MAY_NOT_EXIST_ON_SLAVE) + thd->replication_flags |= OPTION_IF_EXISTS; + param->error= 0; + return TRUE; // Table was deleted + } + } + return FALSE; +} + +/** + @brief + Traverse all plugins to delete table when .frm file is missing. + + @return -1 Table was not found in any engine + @return 0 Table was found in some engine and delete succeded + @return # Error from first engine that had a table but didn't succeed to + delete the table + @return HA_ERR_ROW_IS_REFERENCED if foreign key reference is encountered, + +*/ + +int ha_delete_table_force(THD *thd, const char *path, const LEX_CSTRING *db, + const LEX_CSTRING *alias) +{ + st_force_drop_table_params param; + Table_exists_error_handler no_such_table_handler; + DBUG_ENTER("ha_delete_table_force"); + + param.path= path; + param.db= db; + param.alias= alias; + param.error= -1; // Table not found + param.discovering= true; + + thd->push_internal_handler(&no_such_table_handler); + if (plugin_foreach(thd, delete_table_force, MYSQL_STORAGE_ENGINE_PLUGIN, + ¶m)) + param.error= 0; // Delete succeded + else + { + param.discovering= false; + if (plugin_foreach(thd, delete_table_force, MYSQL_STORAGE_ENGINE_PLUGIN, + ¶m)) + param.error= 0; // Delete succeded + } + thd->pop_internal_handler(); + DBUG_RETURN(param.error); } @@ -4818,7 +5124,7 @@ handler::ha_create(const char *name, TABLE *form, HA_CREATE_INFO *info_arg) int handler::ha_create_partitioning_metadata(const char *name, const char *old_name, - int action_flag) + chf_create_flags action_flag) { /* Normally this is done when unlocked, but in fast_alter_partition_table, @@ -4910,7 +5216,7 @@ int ha_enable_transaction(THD *thd, bool on) DBUG_ENTER("ha_enable_transaction"); DBUG_PRINT("enter", ("on: %d", (int) on)); - if ((thd->transaction.on= on)) + if ((thd->transaction->on= on)) { /* Now all storage engines should have transaction handling enabled. @@ -5028,7 +5334,7 @@ void handler::update_global_table_stats() table->s->table_cache_key.length))) { if (!(table_stats = ((TABLE_STATS*) - my_malloc(sizeof(TABLE_STATS), + my_malloc(PSI_INSTRUMENT_ME, sizeof(TABLE_STATS), MYF(MY_WME | MY_ZEROFILL))))) { /* Out of memory error already given */ @@ -5093,7 +5399,7 @@ void handler::update_global_index_stats() key_length))) { if (!(index_stats = ((INDEX_STATS*) - my_malloc(sizeof(INDEX_STATS), + my_malloc(PSI_INSTRUMENT_ME, sizeof(INDEX_STATS), MYF(MY_WME | MY_ZEROFILL))))) goto end; // Error is already given @@ -5417,7 +5723,7 @@ static my_bool discover_handlerton(THD *thd, plugin_ref plugin, { TABLE_SHARE *share= (TABLE_SHARE *)arg; handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->discover_table) + if (hton->discover_table) { share->db_plugin= plugin; int error= hton->discover_table(hton, thd, share); @@ -5493,7 +5799,7 @@ static my_bool discover_existence(THD *thd, plugin_ref plugin, { st_discover_existence_args *args= (st_discover_existence_args*)arg; handlerton *ht= plugin_hton(plugin); - if (ht->state != SHOW_OPTION_YES || !ht->discover_table_existence) + if (!ht->discover_table_existence) return args->frm_exists; args->hton= ht; @@ -5505,43 +5811,6 @@ static my_bool discover_existence(THD *thd, plugin_ref plugin, return ht->discover_table_existence(ht, args->db, args->table_name); } -class Table_exists_error_handler : public Internal_error_handler -{ -public: - Table_exists_error_handler() - : m_handled_errors(0), m_unhandled_errors(0) - {} - - bool handle_condition(THD *thd, - uint sql_errno, - const char* sqlstate, - Sql_condition::enum_warning_level *level, - const char* msg, - Sql_condition ** cond_hdl) - { - *cond_hdl= NULL; - if (sql_errno == ER_NO_SUCH_TABLE || - sql_errno == ER_NO_SUCH_TABLE_IN_ENGINE || - sql_errno == ER_WRONG_OBJECT) - { - m_handled_errors++; - return TRUE; - } - - if (*level == Sql_condition::WARN_LEVEL_ERROR) - m_unhandled_errors++; - return FALSE; - } - - bool safely_trapped_errors() - { - return ((m_handled_errors > 0) && (m_unhandled_errors == 0)); - } - -private: - int m_handled_errors; - int m_unhandled_errors; -}; /** Check if a given table exists, without doing a full discover, if possible @@ -5570,7 +5839,8 @@ private: *hton will be NULL. */ -bool ha_table_exists(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table_name, +bool ha_table_exists(THD *thd, const LEX_CSTRING *db, + const LEX_CSTRING *table_name, handlerton **hton, bool *is_sequence) { handlerton *dummy; @@ -5607,24 +5877,30 @@ bool ha_table_exists(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table_n { char engine_buf[NAME_CHAR_LEN + 1]; LEX_CSTRING engine= { engine_buf, 0 }; - Table_type type; + Table_type type= dd_frm_type(thd, path, &engine); - if ((type= dd_frm_type(thd, path, &engine, is_sequence)) == - TABLE_TYPE_UNKNOWN) - DBUG_RETURN(0); - - if (type != TABLE_TYPE_VIEW) - { - plugin_ref p= plugin_lock_by_name(thd, &engine, - MYSQL_STORAGE_ENGINE_PLUGIN); - *hton= p ? plugin_hton(p) : NULL; - if (*hton) - // verify that the table really exists - exists= discover_existence(thd, p, &args); - } - else + switch (type) { + case TABLE_TYPE_UNKNOWN: + DBUG_PRINT("exit", ("Exist, cannot be opened")); + DBUG_RETURN(true); // Frm exists + case TABLE_TYPE_VIEW: *hton= view_pseudo_hton; + DBUG_PRINT("exit", ("Exist, view")); + DBUG_RETURN(true); // Frm exists + case TABLE_TYPE_SEQUENCE: + *is_sequence= true; + /* fall through */ + case TABLE_TYPE_NORMAL: + { + plugin_ref p= plugin_lock_by_name(thd, &engine, + MYSQL_STORAGE_ENGINE_PLUGIN); + *hton= p ? plugin_hton(p) : NULL; + if (*hton) // verify that the table really exists + exists= discover_existence(thd, p, &args); + } + } } + DBUG_PRINT("exit", (exists ? "Exists" : "Does not exist")); DBUG_RETURN(exists); } @@ -5634,13 +5910,16 @@ bool ha_table_exists(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table_n { if (hton) *hton= args.hton; + DBUG_PRINT("exit", ("discovery found file")); DBUG_RETURN(TRUE); } if (need_full_discover_for_existence) { TABLE_LIST table; + bool exists; uint flags = GTS_TABLE | GTS_VIEW; + if (!hton) flags|= GTS_NOLOCK; @@ -5657,12 +5936,43 @@ bool ha_table_exists(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table_n } // the table doesn't exist if we've caught ER_NO_SUCH_TABLE and nothing else - DBUG_RETURN(!no_such_table_handler.safely_trapped_errors()); + exists= !no_such_table_handler.safely_trapped_errors(); + DBUG_PRINT("exit", (exists ? "Exists" : "Does not exist")); + DBUG_RETURN(exists); } + DBUG_PRINT("exit", ("Does not exist")); DBUG_RETURN(FALSE); } + +/* + Check if the CREATE/ALTER table should be ignored + This could happen for slaves where the table is shared between master + and slave + + If statement is ignored, write a note +*/ + +bool handler::check_if_updates_are_ignored(const char *op) const +{ + return ha_check_if_updates_are_ignored(table->in_use, ht, op); +} + + +bool ha_check_if_updates_are_ignored(THD *thd, handlerton *hton, + const char *op) +{ + DBUG_ENTER("ha_check_if_updates_are_ignored"); + if (!thd->slave_thread || !(hton= ha_checktype(thd, hton, 1))) + DBUG_RETURN(0); // Not slave or no engine + if (!(hton->flags & HTON_IGNORE_UPDATES)) + DBUG_RETURN(0); // Not shared table + my_error(ER_SLAVE_IGNORED_SHARED_TABLE, MYF(ME_NOTE), op); + DBUG_RETURN(1); +} + + /** Discover all table names in a given database */ @@ -5673,13 +5983,13 @@ static int cmp_file_names(const void *a, const void *b) CHARSET_INFO *cs= character_set_filesystem; char *aa= ((FILEINFO *)a)->name; char *bb= ((FILEINFO *)b)->name; - return my_strnncoll(cs, (uchar*)aa, strlen(aa), (uchar*)bb, strlen(bb)); + return cs->strnncoll(aa, strlen(aa), bb, strlen(bb)); } static int cmp_table_names(LEX_CSTRING * const *a, LEX_CSTRING * const *b) { - return my_strnncoll(&my_charset_bin, (uchar*)((*a)->str), (*a)->length, - (uchar*)((*b)->str), (*b)->length); + return my_charset_bin.strnncoll((*a)->str, (*a)->length, + (*b)->str, (*b)->length); } #ifndef DBUG_OFF @@ -5713,8 +6023,8 @@ bool Discovered_table_list::add_table(const char *tname, size_t tlen) custom discover_table_names() method, that calls add_table() directly). Note: avoid comparing the same name twice (here and in add_file). */ - if (wild && my_wildcmp(table_alias_charset, tname, tname + tlen, wild, wend, - wild_prefix, wild_one, wild_many)) + if (wild && table_alias_charset->wildcmp(tname, tname + tlen, wild, wend, + wild_prefix, wild_one, wild_many)) return 0; LEX_CSTRING *name= thd->make_clex_string(tname, tlen); @@ -5783,7 +6093,7 @@ static my_bool discover_names(THD *thd, plugin_ref plugin, st_discover_names_args *args= (st_discover_names_args *)arg; handlerton *ht= plugin_hton(plugin); - if (ht->state == SHOW_OPTION_YES && ht->discover_table_names) + if (ht->discover_table_names) { size_t old_elements= args->result->tables->elements(); if (ht->discover_table_names(ht, args->db, args->dirp, args->result)) @@ -5828,6 +6138,8 @@ int ha_discover_table_names(THD *thd, LEX_CSTRING *db, MY_DIR *dirp, error= ext_table_discovery_simple(dirp, result) || plugin_foreach(thd, discover_names, MYSQL_STORAGE_ENGINE_PLUGIN, &args); + if (args.possible_duplicates > 0) + result->remove_duplicates(); } else { @@ -6040,7 +6352,7 @@ extern "C" check_result_t handler_index_cond_check(void* h_arg) check_result_t res; DEBUG_SYNC(thd, "handler_index_cond_check"); - enum thd_kill_levels abort_at= h->has_transactions() ? + enum thd_kill_levels abort_at= h->has_rollback() ? THD_ABORT_SOFTLY : THD_ABORT_ASAP; if (thd_kill_level(thd) > abort_at) return CHECK_ABORTED_BY_USER; @@ -6202,7 +6514,7 @@ static my_bool showstat_handlerton(THD *thd, plugin_ref plugin, { enum ha_stat_type stat= *(enum ha_stat_type *) arg; handlerton *hton= plugin_hton(plugin); - if (hton->state == SHOW_OPTION_YES && hton->show_status && + if (hton->show_status && hton->show_status(hton, thd, stat_print, stat)) return TRUE; return FALSE; @@ -6234,17 +6546,8 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat) } else { - if (db_type->state != SHOW_OPTION_YES) - { - const LEX_CSTRING *name= hton_name(db_type); - result= stat_print(thd, name->str, name->length, - "", 0, "DISABLED", 8) ? 1 : 0; - } - else - { - result= db_type->show_status && - db_type->show_status(db_type, thd, stat_print, stat) ? 1 : 0; - } + result= db_type->show_status && + db_type->show_status(db_type, thd, stat_print, stat) ? 1 : 0; } /* @@ -6276,32 +6579,35 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat) 1 Row needs to be logged */ -bool handler::check_table_binlog_row_based(bool binlog_row) +bool handler::check_table_binlog_row_based() { - if (table->versioned(VERS_TRX_ID)) - return false; - if (unlikely((table->in_use->variables.sql_log_bin_off))) - return 0; /* Called by partitioning engine */ -#ifdef WITH_WSREP - if (!table->in_use->variables.sql_log_bin && - wsrep_thd_is_applying(table->in_use)) - return 0; /* wsrep patch sets sql_log_bin to silence binlogging - from high priority threads */ -#endif /* WITH_WSREP */ if (unlikely((!check_table_binlog_row_based_done))) { check_table_binlog_row_based_done= 1; check_table_binlog_row_based_result= - check_table_binlog_row_based_internal(binlog_row); + check_table_binlog_row_based_internal(); } return check_table_binlog_row_based_result; } -bool handler::check_table_binlog_row_based_internal(bool binlog_row) +bool handler::check_table_binlog_row_based_internal() { THD *thd= table->in_use; +#ifdef WITH_WSREP + if (!thd->variables.sql_log_bin && + wsrep_thd_is_applying(table->in_use)) + { + /* + wsrep patch sets sql_log_bin to silence binlogging from high + priority threads + */ + return 0; + } +#endif return (table->s->can_do_row_logging && + !table->versioned(VERS_TRX_ID) && + !(thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) && thd->is_current_stmt_binlog_format_row() && /* Wsrep partially enables binary logging if it have not been @@ -6317,9 +6623,9 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) Otherwise, return 'true' if binary logging is on. */ - IF_WSREP(((WSREP_EMULATE_BINLOG(thd) && + IF_WSREP(((WSREP_EMULATE_BINLOG_NNULL(thd) && wsrep_thd_is_local(thd)) || - ((WSREP(thd) || + ((WSREP_NNULL(thd) || (thd->variables.option_bits & OPTION_BIN_LOG)) && mysql_bin_log.is_open())), (thd->variables.option_bits & OPTION_BIN_LOG) && @@ -6327,137 +6633,22 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) } -/** @brief - Write table maps for all (manually or automatically) locked tables - to the binary log. Also, if binlog_annotate_row_events is ON, - write Annotate_rows event before the first table map. - - SYNOPSIS - write_locked_table_maps() - thd Pointer to THD structure - - DESCRIPTION - This function will generate and write table maps for all tables - that are locked by the thread 'thd'. - - RETURN VALUE - 0 All OK - 1 Failed to write all table maps - - SEE ALSO - THD::lock -*/ - -static int write_locked_table_maps(THD *thd) -{ - DBUG_ENTER("write_locked_table_maps"); - DBUG_PRINT("enter", ("thd:%p thd->lock:%p " - "thd->extra_lock: %p", - thd, thd->lock, thd->extra_lock)); - - DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps())); - - MYSQL_LOCK *locks[2]; - locks[0]= thd->extra_lock; - locks[1]= thd->lock; - my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd), - true) && - thd->variables.binlog_annotate_row_events && - thd->query() && thd->query_length(); - - for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i ) - { - MYSQL_LOCK const *const lock= locks[i]; - if (lock == NULL) - continue; - - TABLE **const end_ptr= lock->table + lock->table_count; - for (TABLE **table_ptr= lock->table ; - table_ptr != end_ptr ; - ++table_ptr) - { - TABLE *const table= *table_ptr; - DBUG_PRINT("info", ("Checking table %s", table->s->table_name.str)); - if (table->current_lock == F_WRLCK && - table->file->check_table_binlog_row_based(0)) - { - /* - We need to have a transactional behavior for SQLCOM_CREATE_TABLE - (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a - compatible behavior with the STMT based replication even when - the table is not transactional. In other words, if the operation - fails while executing the insert phase nothing is written to the - binlog. - - Note that at this point, we check the type of a set of tables to - create the table map events. In the function binlog_log_row(), - which calls the current function, we check the type of the table - of the current row. - */ - bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || - table->file->has_transactions(); - int const error= thd->binlog_write_table_map(table, has_trans, - &with_annotate); - /* - If an error occurs, it is the responsibility of the caller to - roll back the transaction. - */ - if (unlikely(error)) - DBUG_RETURN(1); - } - } - } - DBUG_RETURN(0); -} - - -static int binlog_log_row_internal(TABLE* table, - const uchar *before_record, - const uchar *after_record, - Log_func *log_func) +int handler::binlog_log_row(TABLE *table, + const uchar *before_record, + const uchar *after_record, + Log_func *log_func) { - bool error= 0; - THD *const thd= table->in_use; - - /* - If there are no table maps written to the binary log, this is - the first row handled in this statement. In that case, we need - to write table maps for all locked tables to the binary log. - */ - if (likely(!(error= ((thd->get_binlog_table_maps() == 0 && - write_locked_table_maps(thd)))))) - { - /* - We need to have a transactional behavior for SQLCOM_CREATE_TABLE - (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a - compatible behavior with the STMT based replication even when - the table is not transactional. In other words, if the operation - fails while executing the insert phase nothing is written to the - binlog. - */ - bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || - table->file->has_transactions(); - error= (*log_func)(thd, table, has_trans, before_record, after_record); - } - return error ? HA_ERR_RBR_LOGGING_FAILED : 0; -} - -int binlog_log_row(TABLE* table, const uchar *before_record, - const uchar *after_record, Log_func *log_func) -{ -#ifdef WITH_WSREP - THD *const thd= table->in_use; + bool error; + THD *thd= table->in_use; + DBUG_ENTER("binlog_log_row"); - /* only InnoDB tables will be replicated through binlog emulation */ - if ((WSREP_EMULATE_BINLOG(thd) && - !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) || - thd->wsrep_ignore_table == true) - return 0; -#endif + if (!thd->binlog_table_maps && + thd->binlog_write_table_maps()) + DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED); - if (!table->file->check_table_binlog_row_based(1)) - return 0; - return binlog_log_row_internal(table, before_record, after_record, log_func); + error= (*log_func)(thd, table, row_logging_has_trans, + before_record, after_record); + DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0); } @@ -6503,7 +6694,7 @@ int handler::ha_external_lock(THD *thd, int lock_type) We cache the table flags if the locking succeeded. Otherwise, we keep them as they were when they were fetched in ha_open(). */ - MYSQL_TABLE_LOCK_WAIT(m_psi, PSI_TABLE_EXTERNAL_LOCK, lock_type, + MYSQL_TABLE_LOCK_WAIT(PSI_TABLE_EXTERNAL_LOCK, lock_type, { error= external_lock(thd, lock_type); }) DBUG_EXECUTE_IF("external_lock_failure", error= HA_ERR_GENERIC;); @@ -6543,6 +6734,7 @@ int handler::ha_external_lock(THD *thd, int lock_type) int handler::ha_reset() { DBUG_ENTER("ha_reset"); + /* Check that we have called all proper deallocation functions */ DBUG_ASSERT((uchar*) table->def_read_set.bitmap + table->s->column_bitmap_size == @@ -6556,12 +6748,22 @@ int handler::ha_reset() pushed_cond= NULL; tracker= NULL; mark_trx_read_write_done= 0; + /* + Disable row logging. + */ + row_logging= row_logging_init= 0; clear_cached_table_binlog_row_based_flag(); /* Reset information about pushed engine conditions */ cancel_pushed_idx_cond(); /* Reset information about pushed index conditions */ cancel_pushed_rowid_filter(); - clear_top_table_fields(); + if (lookup_handler != this) + { + lookup_handler->ha_external_unlock(table->in_use); + lookup_handler->close(); + delete lookup_handler; + lookup_handler= this; + } DBUG_RETURN(reset()); } @@ -6569,17 +6771,20 @@ int handler::ha_reset() static int wsrep_after_row(THD *thd) { DBUG_ENTER("wsrep_after_row"); + if (thd->internal_transaction()) + DBUG_RETURN(0); + /* enforce wsrep_max_ws_rows */ thd->wsrep_affected_rows++; if (wsrep_max_ws_rows && - wsrep_thd_is_local(thd) && - thd->wsrep_affected_rows > wsrep_max_ws_rows) + thd->wsrep_affected_rows > wsrep_max_ws_rows && + wsrep_thd_is_local(thd)) { trans_rollback_stmt(thd) || trans_rollback(thd); my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); DBUG_RETURN(ER_ERROR_DURING_COMMIT); } - else if (wsrep_after_row(thd, false)) + else if (wsrep_after_row_internal(thd)) { DBUG_RETURN(ER_LOCK_DEADLOCK); } @@ -6587,33 +6792,33 @@ static int wsrep_after_row(THD *thd) } #endif /* WITH_WSREP */ -static int check_duplicate_long_entry_key(TABLE *table, handler *h, - const uchar *new_rec, uint key_no) + +/** + Check if there is a conflicting unique hash key +*/ + +int handler::check_duplicate_long_entry_key(const uchar *new_rec, uint key_no) { - Field *hash_field; int result, error= 0; KEY *key_info= table->key_info + key_no; - hash_field= key_info->key_part->field; + Field *hash_field= key_info->key_part->field; uchar ptr[HA_HASH_KEY_LENGTH_WITH_NULL]; + DBUG_ENTER("handler::check_duplicate_long_entry_key"); DBUG_ASSERT((key_info->flags & HA_NULL_PART_KEY && - key_info->key_length == HA_HASH_KEY_LENGTH_WITH_NULL) - || key_info->key_length == HA_HASH_KEY_LENGTH_WITHOUT_NULL); + key_info->key_length == HA_HASH_KEY_LENGTH_WITH_NULL) || + key_info->key_length == HA_HASH_KEY_LENGTH_WITHOUT_NULL); if (hash_field->is_real_null()) - return 0; + DBUG_RETURN(0); key_copy(ptr, new_rec, key_info, key_info->key_length, false); - if (!table->check_unique_buf) - table->check_unique_buf= (uchar *)alloc_root(&table->mem_root, - table->s->reclength); - - result= h->ha_index_init(key_no, 0); + result= lookup_handler->ha_index_init(key_no, 0); if (result) - return result; - store_record(table, check_unique_buf); - result= h->ha_index_read_map(table->record[0], + DBUG_RETURN(result); + store_record(table, file->lookup_buffer); + result= lookup_handler->ha_index_read_map(table->record[0], ptr, HA_WHOLE_KEY, HA_READ_KEY_EXACT); if (!result) { @@ -6624,7 +6829,7 @@ static int check_duplicate_long_entry_key(TABLE *table, handler *h, uint arg_count= temp->argument_count(); do { - my_ptrdiff_t diff= table->check_unique_buf - new_rec; + my_ptrdiff_t diff= table->file->lookup_buffer - new_rec; is_same= true; for (uint j=0; is_same && j < arg_count; j++) { @@ -6649,8 +6854,9 @@ static int check_duplicate_long_entry_key(TABLE *table, handler *h, } } } - while (!is_same && !(result= h->ha_index_next_same(table->record[0], - ptr, key_info->key_length))); + while (!is_same && + !(result= lookup_handler->ha_index_next_same(table->record[0], + ptr, key_info->key_length))); if (is_same) error= HA_ERR_FOUND_DUPP_KEY; goto exit; @@ -6660,16 +6866,25 @@ static int check_duplicate_long_entry_key(TABLE *table, handler *h, exit: if (error == HA_ERR_FOUND_DUPP_KEY) { - table->file->errkey= key_no; - if (h->ha_table_flags() & HA_DUPLICATE_POS) + table->file->lookup_errkey= key_no; + if (ha_table_flags() & HA_DUPLICATE_POS) { - h->position(table->record[0]); - memcpy(table->file->dup_ref, h->ref, h->ref_length); + lookup_handler->position(table->record[0]); + memcpy(table->file->dup_ref, lookup_handler->ref, ref_length); } } - restore_record(table, check_unique_buf); - h->ha_index_end(); - return error; + restore_record(table, file->lookup_buffer); + lookup_handler->ha_index_end(); + DBUG_RETURN(error); +} + +void handler::alloc_lookup_buffer() +{ + if (!lookup_buffer) + lookup_buffer= (uchar*)alloc_root(&table->mem_root, + table_share->max_unique_length + + table_share->null_fields + + table_share->reclength); } /** @brief @@ -6677,20 +6892,20 @@ exit: unique constraint on long columns. @returns 0 if no duplicate else returns error */ -static int check_duplicate_long_entries(TABLE *table, handler *h, - const uchar *new_rec) +int handler::check_duplicate_long_entries(const uchar *new_rec) { - table->file->errkey= -1; - int result; + lookup_errkey= (uint)-1; for (uint i= 0; i < table->s->keys; i++) { + int result; if (table->key_info[i].algorithm == HA_KEY_ALG_LONG_HASH && - (result= check_duplicate_long_entry_key(table, h, new_rec, i))) + (result= check_duplicate_long_entry_key(new_rec, i))) return result; } return 0; } + /** @brief check whether updated records breaks the unique constraint on long columns. @@ -6705,11 +6920,10 @@ static int check_duplicate_long_entries(TABLE *table, handler *h, key as a parameter in normal insert key should be -1 @returns 0 if no duplicate else returns error */ -static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar *new_rec) +int handler::check_duplicate_long_entries_update(const uchar *new_rec) { Field *field; uint key_parts; - int error= 0; KEY *keyinfo; KEY_PART_INFO *keypart; /* @@ -6717,7 +6931,7 @@ static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar * with respect to fields in hash_str */ uint reclength= (uint) (table->record[1] - table->record[0]); - table->clone_handler_for_update(); + for (uint i= 0; i < table->s->keys; i++) { keyinfo= table->key_info + i; @@ -6727,13 +6941,13 @@ static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar * keypart= keyinfo->key_part - key_parts; for (uint j= 0; j < key_parts; j++, keypart++) { + int error; field= keypart->field; - /* Compare fields if they are different then check for duplicates*/ - if(field->cmp_binary_offset(reclength)) + /* Compare fields if they are different then check for duplicates */ + if (field->cmp_binary_offset(reclength)) { - if((error= check_duplicate_long_entry_key(table, table->update_handler, - new_rec, i))) - goto exit; + if((error= check_duplicate_long_entry_key(new_rec, i))) + return error; /* break because check_duplicate_long_entries_key will take care of remaining fields @@ -6743,42 +6957,233 @@ static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar * } } } - exit: + return 0; +} + + +int handler::ha_check_overlaps(const uchar *old_data, const uchar* new_data) +{ + DBUG_ASSERT(new_data); + if (this != table->file) + return 0; + if (!table_share->period.unique_keys) + return 0; + if (table->versioned() && !table->vers_end_field()->is_max()) + return 0; + + const bool is_update= old_data != NULL; + uchar *record_buffer= lookup_buffer + table_share->max_unique_length + + table_share->null_fields; + + // Needed to compare record refs later + if (is_update) + position(old_data); + + DBUG_ASSERT(!keyread_enabled()); + + int error= 0; + lookup_errkey= (uint)-1; + + for (uint key_nr= 0; key_nr < table_share->keys && !error; key_nr++) + { + const KEY &key_info= table->key_info[key_nr]; + const uint key_parts= key_info.user_defined_key_parts; + if (!key_info.without_overlaps) + continue; + + if (is_update) + { + bool key_used= false; + for (uint k= 0; k < key_parts && !key_used; k++) + key_used= bitmap_is_set(table->write_set, + key_info.key_part[k].fieldnr - 1); + if (!key_used) + continue; + } + + error= lookup_handler->ha_index_init(key_nr, 0); + if (error) + return error; + + error= lookup_handler->ha_start_keyread(key_nr); + DBUG_ASSERT(!error); + + const uint period_field_length= key_info.key_part[key_parts - 1].length; + const uint key_base_length= key_info.key_length - 2 * period_field_length; + + key_copy(lookup_buffer, new_data, &key_info, 0); + + /* Copy period_start to period_end. + the value in period_start field is not significant, but anyway let's leave + it defined to avoid uninitialized memory access + */ + memcpy(lookup_buffer + key_base_length, + lookup_buffer + key_base_length + period_field_length, + period_field_length); + + /* Find row with period_end > (period_start of new_data) */ + error = lookup_handler->ha_index_read_map(record_buffer, lookup_buffer, + key_part_map((1 << (key_parts - 1)) - 1), + HA_READ_AFTER_KEY); + + if (!error && is_update) + { + /* In case of update it could happen that the nearest neighbour is + a record we are updating. It means, that there are no overlaps + from this side. + */ + DBUG_ASSERT(lookup_handler != this); + DBUG_ASSERT(ref_length == lookup_handler->ref_length); + + lookup_handler->position(record_buffer); + if (memcmp(ref, lookup_handler->ref, ref_length) == 0) + error= lookup_handler->ha_index_next(record_buffer); + } + + if (!error && table->check_period_overlaps(key_info, new_data, record_buffer)) + error= HA_ERR_FOUND_DUPP_KEY; + + if (error == HA_ERR_KEY_NOT_FOUND || error == HA_ERR_END_OF_FILE) + error= 0; + + if (error == HA_ERR_FOUND_DUPP_KEY) + lookup_errkey= key_nr; + + int end_error= lookup_handler->ha_end_keyread(); + DBUG_ASSERT(!end_error); + + end_error= lookup_handler->ha_index_end(); + if (!error && end_error) + error= end_error; + } + return error; } + +/** + Check if galera disables binary logging for this table + + @return 0 Binary logging disabled + @return 1 Binary logging can be enabled +*/ + + +static inline bool wsrep_check_if_binlog_row(TABLE *table) +{ +#ifdef WITH_WSREP + THD *const thd= table->in_use; + + /* only InnoDB tables will be replicated through binlog emulation */ + if ((WSREP_EMULATE_BINLOG(thd) && + !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) || + thd->wsrep_ignore_table == true) + return 0; +#endif + return 1; +} + + +/** + Prepare handler for row logging + + @return 0 if handler will not participate in row logging + @return 1 handler will participate in row logging + + This function is always safe to call on an opened table. +*/ + +bool handler::prepare_for_row_logging() +{ + DBUG_ENTER("handler::prepare_for_row_logging"); + + /* Check if we should have row logging */ + if (wsrep_check_if_binlog_row(table) && + check_table_binlog_row_based()) + { + /* + Row logging enabled. Intialize all variables and write + annotated and table maps + */ + row_logging= row_logging_init= 1; + + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + */ + row_logging_has_trans= + ((sql_command_flags[table->in_use->lex->sql_command] & + (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) || + table->file->has_transactions_and_rollback()); + } + else + { + /* Check row_logging has not been properly cleared from previous command */ + DBUG_ASSERT(row_logging == 0); + } + DBUG_RETURN(row_logging); +} + + +/* + Do all initialization needed for insert +*/ + +int handler::prepare_for_insert(bool do_create) +{ + /* Preparation for unique of blob's */ + if (table->s->long_unique_table || table->s->period.unique_keys) + { + if (do_create && create_lookup_handler()) + return 1; + alloc_lookup_buffer(); + } + return 0; +} + + int handler::ha_write_row(const uchar *buf) { int error; - Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); DBUG_ENTER("handler::ha_write_row"); DEBUG_SYNC_C("ha_write_row_start"); - MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str); - mark_trx_read_write(); - increment_statistics(&SSV::ha_write_count); + if ((error= ha_check_overlaps(NULL, buf))) + DBUG_RETURN(error); - if (table->s->long_unique_table) + if (table->s->long_unique_table && this == table->file) { - if (this->inited == RND) - table->clone_handler_for_update(); - handler *h= table->update_handler ? table->update_handler : table->file; - if ((error= check_duplicate_long_entries(table, h, buf))) + DBUG_ASSERT(inited == NONE || lookup_handler != this); + if ((error= check_duplicate_long_entries(buf))) DBUG_RETURN(error); } - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0, + + MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str); + mark_trx_read_write(); + increment_statistics(&SSV::ha_write_count); + + TABLE_IO_WAIT(tracker, PSI_TABLE_WRITE_ROW, MAX_KEY, error, { error= write_row(buf); }) MYSQL_INSERT_ROW_DONE(error); - if (likely(!error) && !row_already_logged) + if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, 0, buf, log_func); + if (row_logging) + { + Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, 0, buf, log_func); + } #ifdef WITH_WSREP - if (table_share->tmp_table == NO_TMP_TABLE && - WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE && + ht->flags & HTON_WSREP_REPLICATION && + !error && (error= wsrep_after_row(ha_thd()))) { DBUG_RETURN(error); } @@ -6793,10 +7198,8 @@ int handler::ha_write_row(const uchar *buf) int handler::ha_update_row(const uchar *old_data, const uchar *new_data) { int error; - Log_func *log_func= Update_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); - /* Some storage engines require that the new record is in record[0] (and the old record is in record[1]). @@ -6804,41 +7207,50 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data) DBUG_ASSERT(new_data == table->record[0]); DBUG_ASSERT(old_data == table->record[1]); + uint saved_status= table->status; + error= ha_check_overlaps(old_data, new_data); + + if (!error && table->s->long_unique_table && this == table->file) + error= check_duplicate_long_entries_update(new_data); + table->status= saved_status; + + if (error) + return error; + MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); mark_trx_read_write(); increment_statistics(&SSV::ha_update_count); - if (table->s->long_unique_table && - (error= check_duplicate_long_entries_update(table, table->file, (uchar *)new_data))) - { - return error; - } - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, 0, { error= update_row(old_data, new_data);}) MYSQL_UPDATE_ROW_DONE(error); - if (likely(!error) && !row_already_logged) + if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, old_data, new_data, log_func); + if (row_logging) + { + Log_func *log_func= Update_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, old_data, new_data, log_func); + } #ifdef WITH_WSREP THD *thd= ha_thd(); - bool is_wsrep= WSREP(thd); - /* for SR, the followin wsrep_after_row() may replicate a fragment, so we have to - declare potential PA unsafe before that*/ - if (table->s->primary_key == MAX_KEY && - is_wsrep && wsrep_thd_is_local(thd)) + if (WSREP_NNULL(thd)) { - WSREP_DEBUG("marking trx as PA unsafe pk %d", table->s->primary_key); - if (thd->wsrep_cs().mark_transaction_pa_unsafe()) + /* for streaming replication, the following wsrep_after_row() + may replicate a fragment, so we have to declare potential PA + unsafe before that */ + if (table->s->primary_key == MAX_KEY && wsrep_thd_is_local(thd)) { - WSREP_DEBUG("session does not have active transaction, can not mark as PA unsafe"); + WSREP_DEBUG("marking trx as PA unsafe pk %d", table->s->primary_key); + if (thd->wsrep_cs().mark_transaction_pa_unsafe()) + WSREP_DEBUG("session does not have active transaction," + " can not mark as PA unsafe"); } - } - if (table_share->tmp_table == NO_TMP_TABLE && - is_wsrep && (error= wsrep_after_row(thd))) - { - return error; + + if (!error && table_share->tmp_table == NO_TMP_TABLE && + ht->flags & HTON_WSREP_REPLICATION) + error= wsrep_after_row(thd); } #endif /* WITH_WSREP */ } @@ -6876,7 +7288,6 @@ int handler::update_first_row(const uchar *new_data) int handler::ha_delete_row(const uchar *buf) { int error; - Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); /* @@ -6889,31 +7300,35 @@ int handler::ha_delete_row(const uchar *buf) mark_trx_read_write(); increment_statistics(&SSV::ha_delete_count); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_DELETE_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_DELETE_ROW, active_index, error, { error= delete_row(buf);}) MYSQL_DELETE_ROW_DONE(error); if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, buf, 0, log_func); + if (row_logging) + { + Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, buf, 0, log_func); + } #ifdef WITH_WSREP THD *thd= ha_thd(); - bool is_wsrep= WSREP(thd); - /* for SR, the followin wsrep_after_row() may replicate a fragment, so we have to - declare potential PA unsafe before that*/ - if (table->s->primary_key == MAX_KEY && - is_wsrep && wsrep_thd_is_local(thd)) + if (WSREP_NNULL(thd)) { - WSREP_DEBUG("marking trx as PA unsafe pk %d", table->s->primary_key); - if (thd->wsrep_cs().mark_transaction_pa_unsafe()) + /* for streaming replication, the following wsrep_after_row() + may replicate a fragment, so we have to declare potential PA + unsafe before that */ + if (table->s->primary_key == MAX_KEY && wsrep_thd_is_local(thd)) { - WSREP_DEBUG("session does not have active transaction, can not mark as PA unsafe"); + WSREP_DEBUG("marking trx as PA unsafe pk %d", table->s->primary_key); + if (thd->wsrep_cs().mark_transaction_pa_unsafe()) + WSREP_DEBUG("session does not have active transaction," + " can not mark as PA unsafe"); } - } - if (table_share->tmp_table == NO_TMP_TABLE && - is_wsrep && (error= wsrep_after_row(thd))) - { - return error; + + if (!error && table_share->tmp_table == NO_TMP_TABLE && + ht->flags & HTON_WSREP_REPLICATION) + error= wsrep_after_row(thd); } #endif /* WITH_WSREP */ } @@ -6938,11 +7353,10 @@ int handler::ha_delete_row(const uchar *buf) int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) { int error; - MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); mark_trx_read_write(); - error = direct_update_rows(update_rows, found_rows); + error= direct_update_rows(update_rows, found_rows); MYSQL_UPDATE_ROW_DONE(error); return error; } @@ -7376,7 +7790,7 @@ int del_global_table_stat(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *ta cache_key_length= db->length + 1 + table->length + 1; - if(!(cache_key= (uchar *)my_malloc(cache_key_length, + if(!(cache_key= (uchar *)my_malloc(PSI_INSTRUMENT_ME, cache_key_length, MYF(MY_WME | MY_ZEROFILL)))) { /* Out of memory error already given */ @@ -7600,11 +8014,7 @@ bool Table_scope_and_contents_source_st::vers_check_system_fields( if (!(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING)) return false; - bool can_native= ha_check_storage_engine_flag(db_type, - HTON_NATIVE_SYS_VERSIONING) - || db_type->db_type == DB_TYPE_PARTITION_DB; - - return vers_info.check_sys_fields(table_name, db, alter_info, can_native); + return vers_info.check_sys_fields(table_name, db, alter_info); } @@ -7715,7 +8125,16 @@ bool Vers_parse_info::fix_alter_info(THD *thd, Alter_info *alter_info, return false; } - return fix_implicit(thd, alter_info); + if (fix_implicit(thd, alter_info)) + return true; + + if (alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING) + { + if (check_sys_fields(table_name, share->db, alter_info)) + return true; + } + + return false; } bool @@ -7835,48 +8254,106 @@ bool Vers_parse_info::check_conditions(const Lex_table_name &table_name, return false; } +static bool is_versioning_timestamp(const Column_definition *f) +{ + return f->type_handler() == &type_handler_timestamp2 && + f->length == MAX_DATETIME_FULL_WIDTH; +} -bool Create_field::vers_check_timestamp(const Lex_table_name &table_name) const +static bool is_some_bigint(const Column_definition *f) { - if (type_handler() == &type_handler_timestamp2 && - length == MAX_DATETIME_FULL_WIDTH) - return false; + return f->type_handler() == &type_handler_slonglong || + f->type_handler() == &type_handler_ulonglong || + f->type_handler() == &type_handler_vers_trx_id; +} - my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), field_name.str, "TIMESTAMP(6)", - table_name.str); - return true; +static bool is_versioning_bigint(const Column_definition *f) +{ + return is_some_bigint(f) && f->flags & UNSIGNED_FLAG && + f->length == MY_INT64_NUM_DECIMAL_DIGITS - 1; } +static void require_timestamp_error(const char *field, const char *table) +{ + my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), field, "TIMESTAMP(6)", table); +} -bool Create_field::vers_check_bigint(const Lex_table_name &table_name) const +static void require_trx_id_error(const char *field, const char *table) { - if (is_some_bigint() && flags & UNSIGNED_FLAG && - length == MY_INT64_NUM_DECIMAL_DIGITS - 1) - return false; + my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), field, "BIGINT(20) UNSIGNED", + table); +} + + +bool Vers_type_timestamp::check_sys_fields(const LEX_CSTRING &table_name, + const Column_definition *row_start, + const Column_definition *row_end) const +{ + if (!is_versioning_timestamp(row_start)) + { + require_timestamp_error(row_start->field_name.str, table_name.str); + return true; + } - my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), field_name.str, - "BIGINT(20) UNSIGNED", table_name.str); - return true; + if (row_end->type_handler()->vers() != this || + !is_versioning_timestamp(row_end)) + { + require_timestamp_error(row_end->field_name.str, table_name.str); + return true; + } + + return false; +} + + +bool Vers_type_trx::check_sys_fields(const LEX_CSTRING &table_name, + const Column_definition *row_start, + const Column_definition *row_end) const +{ + if (!is_versioning_bigint(row_start)) + { + require_trx_id_error(row_start->field_name.str, table_name.str); + return true; + } + + if (row_end->type_handler()->vers() != this || + !is_versioning_bigint(row_end)) + { + require_trx_id_error(row_end->field_name.str, table_name.str); + return true; + } + + if (!is_some_bigint(row_start)) + { + require_timestamp_error(row_start->field_name.str, table_name.str); + return true; + } + + if (!TR_table::use_transaction_registry) + { + my_error(ER_VERS_TRT_IS_DISABLED, MYF(0)); + return true; + } + + return false; } bool Vers_parse_info::check_sys_fields(const Lex_table_name &table_name, const Lex_table_name &db, - Alter_info *alter_info, - bool can_native) const + Alter_info *alter_info) const { if (check_conditions(table_name, db)) return true; - const Create_field *row_start= NULL; - const Create_field *row_end= NULL; - List_iterator<Create_field> it(alter_info->create_list); - while (Create_field *f= it++) + const Create_field *row_start= nullptr; + const Create_field *row_end= nullptr; + while (const Create_field *f= it++) { - if (!row_start && f->flags & VERS_ROW_START) + if (f->flags & VERS_ROW_START && !row_start) row_start= f; - else if (!row_end && f->flags & VERS_ROW_END) + if (f->flags & VERS_ROW_END && !row_end) row_end= f; } @@ -7886,28 +8363,15 @@ bool Vers_parse_info::check_sys_fields(const Lex_table_name &table_name, return true; } - if (!can_native || - !row_start->is_some_bigint() || - !row_end->is_some_bigint()) - { - if (row_start->vers_check_timestamp(table_name) || - row_end->vers_check_timestamp(table_name)) - return true; - } - else - { - if (row_start->vers_check_bigint(table_name) || - row_end->vers_check_bigint(table_name)) - return true; + const Vers_type_handler *row_start_vers= row_start->type_handler()->vers(); - if (!TR_table::use_transaction_registry) - { - my_error(ER_VERS_TRT_IS_DISABLED, MYF(0)); - return true; - } + if (!row_start_vers) + { + require_timestamp_error(row_start->field_name.str, table_name); + return true; } - return false; + return row_start_vers->check_sys_fields(table_name, row_start, row_end); } bool Table_period_info::check_field(const Create_field* f, |