summaryrefslogtreecommitdiff
path: root/sql/handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/handler.cc')
-rw-r--r--sql/handler.cc1172
1 files changed, 777 insertions, 395 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 79dcee037a5..e70cef5742e 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1,5 +1,5 @@
/* Copyright (c) 2000, 2016, Oracle and/or its affiliates.
- Copyright (c) 2009, 2019, MariaDB Corporation.
+ Copyright (c) 2009, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -43,6 +43,7 @@
#include "debug_sync.h" // DEBUG_SYNC
#include "sql_audit.h"
#include "ha_sequence.h"
+#include "rowid_filter.h"
#ifdef WITH_PARTITION_STORAGE_ENGINE
#include "ha_partition.h"
@@ -54,8 +55,12 @@
#include "semisync_master.h"
#include "wsrep_mysqld.h"
-#include "wsrep.h"
+#ifdef WITH_WSREP
+#include "wsrep_binlog.h"
#include "wsrep_xid.h"
+#include "wsrep_thd.h"
+#include "wsrep_trans_observer.h" /* wsrep transaction hooks */
+#endif /* WITH_WSREP */
/*
While we have legacy_db_type, we have this array to
@@ -128,6 +133,23 @@ static plugin_ref ha_default_tmp_plugin(THD *thd)
return ha_default_plugin(thd);
}
+#if defined(WITH_ARIA_STORAGE_ENGINE) && MYSQL_VERSION_ID < 100500
+void ha_maria_implicit_commit(THD *thd, bool new_trn)
+{
+ if (ha_maria::has_active_transaction(thd))
+ {
+ int error;
+ MDL_request mdl_request;
+ mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT);
+ error= thd->mdl_context.acquire_lock(&mdl_request,
+ thd->variables.lock_wait_timeout);
+ ha_maria::implicit_commit(thd, new_trn);
+ if (!error)
+ thd->mdl_context.release_lock(mdl_request.ticket);
+ }
+}
+#endif
+
/** @brief
Return the default storage engine handlerton for thread
@@ -277,6 +299,9 @@ handlerton *ha_checktype(THD *thd, handlerton *hton, bool no_substitute)
if (no_substitute)
return NULL;
+#ifdef WITH_WSREP
+ (void)wsrep_after_rollback(thd, false);
+#endif /* WITH_WSREP */
return ha_default_handlerton(thd);
} /* ha_checktype */
@@ -322,7 +347,7 @@ handler *get_ha_partition(partition_info *part_info)
}
else
{
- my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
+ my_error(ER_OUTOFMEMORY, MYF(ME_FATAL),
static_cast<int>(sizeof(ha_partition)));
}
DBUG_RETURN(((handler*) partition));
@@ -432,9 +457,9 @@ static int ha_finish_errors(void)
return 0;
}
-static volatile int32 need_full_discover_for_existence= 0;
-static volatile int32 engines_with_discover_file_names= 0;
-static volatile int32 engines_with_discover= 0;
+static Atomic_counter<int32> need_full_discover_for_existence(0);
+static Atomic_counter<int32> engines_with_discover_file_names(0);
+static Atomic_counter<int32> engines_with_discover(0);
static int full_discover_for_existence(handlerton *, const char *, const char *)
{ return 0; }
@@ -456,13 +481,13 @@ static int hton_ext_based_table_discovery(handlerton *hton, LEX_CSTRING *db,
static void update_discovery_counters(handlerton *hton, int val)
{
if (hton->discover_table_existence == full_discover_for_existence)
- my_atomic_add32(&need_full_discover_for_existence, val);
+ need_full_discover_for_existence+= val;
if (hton->discover_table_names && hton->tablefile_extensions[0])
- my_atomic_add32(&engines_with_discover_file_names, val);
+ engines_with_discover_file_names+= val;
if (hton->discover_table)
- my_atomic_add32(&engines_with_discover, val);
+ engines_with_discover+= val;
}
int ha_finalize_handlerton(st_plugin_int *plugin)
@@ -709,7 +734,7 @@ int ha_init()
binary log (which is considered a transaction-capable storage engine in
counting total_ha)
*/
- opt_using_transactions= total_ha>(ulong)opt_bin_log;
+ opt_using_transactions= total_ha > (ulong) opt_bin_log;
savepoint_alloc_size+= sizeof(SAVEPOINT);
DBUG_RETURN(error);
}
@@ -719,7 +744,6 @@ int ha_end()
int error= 0;
DBUG_ENTER("ha_end");
-
/*
This should be eventually based on the graceful shutdown flag.
So if flag is equal to HA_PANIC_CLOSE, the deallocate
@@ -835,11 +859,10 @@ static my_bool kill_handlerton(THD *thd, plugin_ref plugin,
{
handlerton *hton= plugin_hton(plugin);
- mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
if (hton->state == SHOW_OPTION_YES && hton->kill_query &&
thd_get_ha_data(thd, hton))
hton->kill_query(hton, thd, *(enum thd_kill_levels *) level);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
return FALSE;
}
@@ -851,6 +874,43 @@ void ha_kill_query(THD* thd, enum thd_kill_levels level)
}
+/*****************************************************************************
+ Backup functions
+******************************************************************************/
+
+static my_bool plugin_prepare_for_backup(THD *unused1, plugin_ref plugin,
+ void *not_used)
+{
+ handlerton *hton= plugin_hton(plugin);
+ if (hton->state == SHOW_OPTION_YES && hton->prepare_for_backup)
+ hton->prepare_for_backup();
+ return FALSE;
+}
+
+void ha_prepare_for_backup()
+{
+ plugin_foreach_with_mask(0, plugin_prepare_for_backup,
+ MYSQL_STORAGE_ENGINE_PLUGIN,
+ PLUGIN_IS_DELETED|PLUGIN_IS_READY, 0);
+}
+
+static my_bool plugin_end_backup(THD *unused1, plugin_ref plugin,
+ void *not_used)
+{
+ handlerton *hton= plugin_hton(plugin);
+ if (hton->state == SHOW_OPTION_YES && hton->end_backup)
+ hton->end_backup();
+ return FALSE;
+}
+
+void ha_end_backup()
+{
+ plugin_foreach_with_mask(0, plugin_end_backup,
+ MYSQL_STORAGE_ENGINE_PLUGIN,
+ PLUGIN_IS_DELETED|PLUGIN_IS_READY, 0);
+}
+
+
/* ========================================================================
======================= TRANSACTIONS ===================================*/
@@ -1183,25 +1243,40 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)
ha_info->register_ha(trans, ht_arg);
trans->no_2pc|=(ht_arg->prepare==0);
- if (thd->transaction.xid_state.xid.is_null())
- thd->transaction.xid_state.xid.set(thd->query_id);
+
+ /* Set implicit xid even if there's explicit XA, it will be ignored anyway. */
+ if (thd->transaction.implicit_xid.is_null())
+ thd->transaction.implicit_xid.set(thd->query_id);
+
DBUG_VOID_RETURN;
}
static int prepare_or_error(handlerton *ht, THD *thd, bool all)
{
+#ifdef WITH_WSREP
+ const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all);
+ if (run_wsrep_hooks && ht->flags & HTON_WSREP_REPLICATION &&
+ wsrep_before_prepare(thd, all))
+ {
+ return(1);
+ }
+#endif /* WITH_WSREP */
+
int err= ht->prepare(ht, thd, all);
status_var_increment(thd->status_var.ha_prepare_count);
if (err)
{
- /* avoid sending error, if we're going to replay the transaction */
-#ifdef WITH_WSREP
- if (ht != wsrep_hton ||
- err == EMSGSIZE || thd->wsrep_conflict_state != MUST_REPLAY)
-#endif
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
}
+#ifdef WITH_WSREP
+ if (run_wsrep_hooks && !err && ht->flags & HTON_WSREP_REPLICATION &&
+ wsrep_after_prepare(thd, all))
+ {
+ err= 1;
+ }
+#endif /* WITH_WSREP */
+
return err;
}
@@ -1342,6 +1417,9 @@ int ha_commit_trans(THD *thd, bool all)
Ha_trx_info *ha_info= trans->ha_list;
bool need_prepare_ordered, need_commit_ordered;
my_xid xid;
+#ifdef WITH_WSREP
+ const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all);
+#endif /* WITH_WSREP */
DBUG_ENTER("ha_commit_trans");
DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d",
thd, (ulong) thd->variables.option_bits, all));
@@ -1385,10 +1463,6 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_RETURN(2);
}
-#ifdef WITH_ARIA_STORAGE_ENGINE
- ha_maria::implicit_commit(thd, TRUE);
-#endif
-
if (!ha_info)
{
/*
@@ -1396,7 +1470,15 @@ int ha_commit_trans(THD *thd, bool all)
*/
if (is_real_trans)
thd->transaction.cleanup();
- DBUG_RETURN(0);
+#ifdef WITH_WSREP
+ if (wsrep_is_active(thd) && is_real_trans && !error)
+ {
+ wsrep_commit_empty(thd, all);
+ }
+#endif /* WITH_WSREP */
+
+ ha_maria_implicit_commit(thd, TRUE);
+ DBUG_RETURN(error);
}
DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE(););
@@ -1409,7 +1491,7 @@ int ha_commit_trans(THD *thd, bool all)
/* rw_trans is TRUE when we in a transaction changing data */
bool rw_trans= is_real_trans &&
(rw_ha_count > (thd->is_current_stmt_binlog_disabled()?0U:1U));
- MDL_request mdl_request;
+ MDL_request mdl_backup;
DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
is_real_trans, rw_trans, rw_ha_count));
@@ -1423,19 +1505,25 @@ int ha_commit_trans(THD *thd, bool all)
We allow the owner of FTWRL to COMMIT; we assume that it knows
what it does.
*/
- mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
- MDL_EXPLICIT);
+ mdl_backup.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT);
- if (!WSREP(thd) &&
- thd->mdl_context.acquire_lock(&mdl_request,
- thd->variables.lock_wait_timeout))
+ if (!WSREP(thd))
{
- ha_rollback_trans(thd, all);
- DBUG_RETURN(1);
+ if (thd->mdl_context.acquire_lock(&mdl_backup,
+ thd->variables.lock_wait_timeout))
+ {
+ ha_rollback_trans(thd, all);
+ DBUG_RETURN(1);
+ }
+ thd->backup_commit_lock= &mdl_backup;
}
-
DEBUG_SYNC(thd, "ha_commit_trans_after_acquire_commit_lock");
+
+ /* Use shortcut as we already have the MDL_BACKUP_COMMIT lock */
+ ha_maria::implicit_commit(thd, TRUE);
}
+ else
+ ha_maria_implicit_commit(thd, TRUE);
if (rw_trans &&
opt_readonly &&
@@ -1483,13 +1571,33 @@ int ha_commit_trans(THD *thd, bool all)
if (trans->no_2pc || (rw_ha_count <= 1))
{
+#ifdef WITH_WSREP
+ /*
+ This commit will not go through log_and_order() where wsrep commit
+ ordering is normally done. Commit ordering must be done here.
+ */
+ if (run_wsrep_hooks)
+ error= wsrep_before_commit(thd, all);
+ if (error)
+ {
+ ha_rollback_trans(thd, FALSE);
+ goto wsrep_err;
+ }
+#endif /* WITH_WSREP */
error= ha_commit_one_phase(thd, all);
+#ifdef WITH_WSREP
+ if (run_wsrep_hooks)
+ error= error || wsrep_after_commit(thd, all);
+#endif /* WITH_WSREP */
goto done;
}
need_prepare_ordered= FALSE;
need_commit_ordered= FALSE;
- xid= thd->transaction.xid_state.xid.get_my_xid();
+ DBUG_ASSERT(thd->transaction.implicit_xid.get_my_xid() ==
+ thd->transaction.implicit_xid.quick_get_my_xid());
+ xid= thd->transaction.xid_state.is_explicit_XA() ? 0 :
+ thd->transaction.implicit_xid.quick_get_my_xid();
for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
{
@@ -1515,10 +1623,14 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
#ifdef WITH_WSREP
- if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid))
+ if (run_wsrep_hooks && !error)
{
- // xid was rewritten by wsrep
- xid= wsrep_xid_seqno(thd->transaction.xid_state.xid);
+ wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid);
+ if (!s.is_undefined())
+ {
+ // xid was rewritten by wsrep
+ xid= s.get();
+ }
}
#endif /* WITH_WSREP */
@@ -1527,18 +1639,35 @@ int ha_commit_trans(THD *thd, bool all)
error= commit_one_phase_2(thd, all, trans, is_real_trans);
goto done;
}
-
+#ifdef WITH_WSREP
+ if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all)))
+ goto wsrep_err;
+#endif /* WITH_WSREP */
DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
need_commit_ordered);
if (!cookie)
+ {
+ WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
goto err;
-
+ }
DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order");
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
-
+#ifdef WITH_WSREP
+ if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all))))
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (wsrep_must_abort(thd))
+ {
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ (void)tc_log->unlog(cookie, xid);
+ goto wsrep_err;
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+#endif /* WITH_WSREP */
DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
if (tc_log->unlog(cookie, xid))
{
@@ -1560,6 +1689,19 @@ done:
goto end;
/* Come here if error and we need to rollback. */
+#ifdef WITH_WSREP
+wsrep_err:
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (run_wsrep_hooks && wsrep_must_abort(thd))
+ {
+ WSREP_DEBUG("BF abort has happened after prepare & certify");
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ ha_rollback_trans(thd, TRUE);
+ }
+ else
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+
+#endif /* WITH_WSREP */
err:
error= 1; /* Transaction was rolled back */
/*
@@ -1569,9 +1711,13 @@ err:
*/
if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec))
ha_rollback_trans(thd, all);
-
+ else
+ {
+ WSREP_DEBUG("rollback skipped %p %d",thd->rgi_slave,
+ thd->rgi_slave->is_parallel_exec);
+ }
end:
- if (rw_trans && mdl_request.ticket)
+ if (mdl_backup.ticket)
{
/*
We do not always immediately release transactional locks
@@ -1579,14 +1725,25 @@ end:
thus we release the commit blocker lock as soon as it's
not needed.
*/
- thd->mdl_context.release_lock(mdl_request.ticket);
+ thd->mdl_context.release_lock(mdl_backup.ticket);
}
+ thd->backup_commit_lock= 0;
+#ifdef WITH_WSREP
+ if (wsrep_is_active(thd) && is_real_trans && !error &&
+ (rw_ha_count == 0 || all) &&
+ wsrep_not_committed(thd))
+ {
+ wsrep_commit_empty(thd, all);
+ }
+#endif /* WITH_WSREP */
+
DBUG_RETURN(error);
}
/**
@note
- This function does not care about global read lock. A caller should.
+ This function does not care about global read lock or backup locks,
+ the caller should.
@param[in] all Is set in case of explicit commit
(COMMIT statement), or implicit commit
@@ -1635,6 +1792,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
DBUG_ENTER("commit_one_phase_2");
if (is_real_trans)
DEBUG_SYNC(thd, "commit_one_phase_2");
+
if (ha_info)
{
for (; ha_info; ha_info= ha_info_next)
@@ -1663,6 +1821,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
#endif
}
}
+
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
{
@@ -1738,6 +1897,9 @@ int ha_rollback_trans(THD *thd, bool all)
DBUG_RETURN(1);
}
+#ifdef WITH_WSREP
+ (void) wsrep_before_rollback(thd, all);
+#endif /* WITH_WSREP */
if (ha_info)
{
/* Close all cursors that can not survive ROLLBACK */
@@ -1753,9 +1915,9 @@ int ha_rollback_trans(THD *thd, bool all)
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
#ifdef WITH_WSREP
- WSREP_WARN("handlerton rollback failed, thd %llu %lld conf %d SQL %s",
- thd->thread_id, thd->query_id, thd->wsrep_conflict_state,
- thd->query());
+ WSREP_WARN("handlerton rollback failed, thd %lld %lld conf %d SQL %s",
+ thd->thread_id, thd->query_id, thd->wsrep_trx().state(),
+ thd->query());
#endif /* WITH_WSREP */
}
status_var_increment(thd->status_var.ha_rollback_count);
@@ -1766,17 +1928,25 @@ int ha_rollback_trans(THD *thd, bool all)
trans->no_2pc=0;
}
- /*
- Thanks to possibility of MDL deadlock rollback request can come even if
- transaction hasn't been started in any transactional storage engine.
- */
- if (is_real_trans && thd->transaction_rollback_request &&
- thd->transaction.xid_state.xa_state != XA_NOTR)
- thd->transaction.xid_state.rm_error= thd->get_stmt_da()->sql_errno();
-
+#ifdef WITH_WSREP
+ if (thd->is_error())
+ {
+ WSREP_DEBUG("ha_rollback_trans(%lld, %s) rolled back: %s: %s; is_real %d",
+ thd->thread_id, all?"TRUE":"FALSE", WSREP_QUERY(thd),
+ thd->get_stmt_da()->message(), is_real_trans);
+ }
+ (void) wsrep_after_rollback(thd, all);
+#endif /* WITH_WSREP */
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
+ /*
+ Thanks to possibility of MDL deadlock rollback request can come even if
+ transaction hasn't been started in any transactional storage engine.
+ */
+ if (thd->transaction_rollback_request)
+ thd->transaction.xid_state.set_error(thd->get_stmt_da()->sql_errno());
+
thd->has_waiter= false;
thd->transaction.cleanup();
}
@@ -1911,22 +2081,16 @@ char *xid_to_str(char *buf, const XID &xid)
}
#endif
-#ifdef WITH_WSREP
static my_xid wsrep_order_and_check_continuity(XID *list, int len)
{
+#ifdef WITH_WSREP
wsrep_sort_xid_array(list, len);
- wsrep_uuid_t uuid;
- wsrep_seqno_t seqno;
- if (wsrep_get_SE_checkpoint(uuid, seqno))
- {
- WSREP_ERROR("Could not read wsrep SE checkpoint for recovery");
- return 0;
- }
- long long cur_seqno= seqno;
+ wsrep::gtid cur_position= wsrep_get_SE_checkpoint();
+ long long cur_seqno= cur_position.seqno().get();
for (int i= 0; i < len; ++i)
{
if (!wsrep_is_wsrep_xid(list + i) ||
- wsrep_xid_seqno(*(list + i)) != cur_seqno + 1)
+ wsrep_xid_seqno(list + i) != cur_seqno + 1)
{
WSREP_WARN("Discovered discontinuity in recovered wsrep "
"transaction XIDs. Truncating the recovery list to "
@@ -1937,9 +2101,10 @@ static my_xid wsrep_order_and_check_continuity(XID *list, int len)
}
WSREP_INFO("Last wsrep seqno to be recovered %lld", cur_seqno);
return (cur_seqno < 0 ? 0 : cur_seqno);
-}
+#else
+ return 0;
#endif /* WITH_WSREP */
-
+}
/**
recover() step of xa.
@@ -1977,33 +2142,43 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
{
sql_print_information("Found %d prepared transaction(s) in %s",
got, hton_name(hton)->str);
-#ifdef WITH_WSREP
/* If wsrep_on=ON, XIDs are first ordered and then the range of
recovered XIDs is checked for continuity. All the XIDs which
are in continuous range can be safely committed if binlog
is off since they have already ordered and certified in the
- cluster. */
- my_xid wsrep_limit= 0;
+ cluster.
+
+ The discontinuity of wsrep XIDs may happen because the GTID
+ is assigned for transaction in wsrep_before_prepare(), but the
+ commit order is entered in wsrep_before_commit(). This means that
+ transactions may run prepare step out of order and may
+ result in gap in wsrep XIDs. This can be the case for example
+ if we have T1 with seqno 1 and T2 with seqno 2 and the server
+ crashes after T2 finishes prepare step but before T1 starts
+ the prepare.
+ */
+ my_xid wsrep_limit __attribute__((unused))= 0;
+
+ /* Note that we could call this for binlog also that
+ will not have WSREP(thd) but global wsrep on might
+ be true.
+ */
if (WSREP_ON)
- {
wsrep_limit= wsrep_order_and_check_continuity(info->list, got);
- }
-#endif /* WITH_WSREP */
for (int i=0; i < got; i ++)
{
- my_xid x= IF_WSREP(WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ?
- wsrep_xid_seqno(info->list[i]) :
+ my_xid x= IF_WSREP(wsrep_is_wsrep_xid(&info->list[i]) ?
+ wsrep_xid_seqno(&info->list[i]) :
info->list[i].get_my_xid(),
info->list[i].get_my_xid());
if (!x) // not "mine" - that is generated by external TM
{
-#ifndef DBUG_OFF
- char buf[XIDDATASIZE*4+6]; // see xid_to_str
- DBUG_PRINT("info",
- ("ignore xid %s", xid_to_str(buf, info->list[i])));
-#endif
- xid_cache_insert(info->list+i, XA_PREPARED);
+ DBUG_EXECUTE("info",{
+ char buf[XIDDATASIZE*4+6];
+ _db_doprnt_("ignore xid %s", xid_to_str(buf, info->list[i]));
+ });
+ xid_cache_insert(info->list + i);
info->found_foreign_xids++;
continue;
}
@@ -2023,33 +2198,25 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT))
{
-#ifndef DBUG_OFF
- int rc=
-#endif
- hton->commit_by_xid(hton, info->list+i);
-#ifndef DBUG_OFF
+ int rc= hton->commit_by_xid(hton, info->list+i);
if (rc == 0)
{
- char buf[XIDDATASIZE*4+6]; // see xid_to_str
- DBUG_PRINT("info",
- ("commit xid %s", xid_to_str(buf, info->list[i])));
+ DBUG_EXECUTE("info",{
+ char buf[XIDDATASIZE*4+6];
+ _db_doprnt_("commit xid %s", xid_to_str(buf, info->list[i]));
+ });
}
-#endif
}
else
{
-#ifndef DBUG_OFF
- int rc=
-#endif
- hton->rollback_by_xid(hton, info->list+i);
-#ifndef DBUG_OFF
+ int rc= hton->rollback_by_xid(hton, info->list+i);
if (rc == 0)
{
- char buf[XIDDATASIZE*4+6]; // see xid_to_str
- DBUG_PRINT("info",
- ("rollback xid %s", xid_to_str(buf, info->list[i])));
+ DBUG_EXECUTE("info",{
+ char buf[XIDDATASIZE*4+6];
+ _db_doprnt_("rollback xid %s", xid_to_str(buf, info->list[i]));
+ });
}
-#endif
}
}
if (got < info->len)
@@ -2116,186 +2283,6 @@ int ha_recover(HASH *commit_list)
DBUG_RETURN(0);
}
-/**
- return the XID as it appears in the SQL function's arguments.
- So this string can be passed to XA START, XA PREPARE etc...
-
- @note
- the 'buf' has to have space for at least SQL_XIDSIZE bytes.
-*/
-
-
-/*
- 'a'..'z' 'A'..'Z', '0'..'9'
- and '-' '_' ' ' symbols don't have to be
- converted.
-*/
-
-static const char xid_needs_conv[128]=
-{
- 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
- 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
- 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
- 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
- 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
- 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
- 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
- 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
-};
-
-uint get_sql_xid(XID *xid, char *buf)
-{
- int tot_len= xid->gtrid_length + xid->bqual_length;
- int i;
- const char *orig_buf= buf;
-
- for (i=0; i<tot_len; i++)
- {
- uchar c= ((uchar *) xid->data)[i];
- if (c >= 128 || xid_needs_conv[c])
- break;
- }
-
- if (i >= tot_len)
- {
- /* No need to convert characters to hexadecimals. */
- *buf++= '\'';
- memcpy(buf, xid->data, xid->gtrid_length);
- buf+= xid->gtrid_length;
- *buf++= '\'';
- if (xid->bqual_length > 0 || xid->formatID != 1)
- {
- *buf++= ',';
- *buf++= '\'';
- memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
- buf+= xid->bqual_length;
- *buf++= '\'';
- }
- }
- else
- {
- *buf++= 'X';
- *buf++= '\'';
- for (i= 0; i < xid->gtrid_length; i++)
- {
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
- }
- *buf++= '\'';
- if (xid->bqual_length > 0 || xid->formatID != 1)
- {
- *buf++= ',';
- *buf++= 'X';
- *buf++= '\'';
- for (; i < tot_len; i++)
- {
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
- }
- *buf++= '\'';
- }
- }
-
- if (xid->formatID != 1)
- {
- *buf++= ',';
- buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
- MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
- }
-
- return (uint)(buf - orig_buf);
-}
-
-
-/**
- return the list of XID's to a client, the same way SHOW commands do.
-
- @note
- I didn't find in XA specs that an RM cannot return the same XID twice,
- so mysql_xa_recover does not filter XID's to ensure uniqueness.
- It can be easily fixed later, if necessary.
-*/
-
-static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
- char *data, uint data_len, CHARSET_INFO *data_cs)
-{
- if (xs->xa_state == XA_PREPARED)
- {
- protocol->prepare_for_resend();
- protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
- protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
- protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
- protocol->store(data, data_len, data_cs);
- if (protocol->write())
- return TRUE;
- }
- return FALSE;
-}
-
-
-static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
-{
- return xa_recover_callback(xs, protocol, xs->xid.data,
- xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
-}
-
-
-static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
-{
- char buf[SQL_XIDSIZE];
- uint len= get_sql_xid(&xs->xid, buf);
- return xa_recover_callback(xs, protocol, buf, len,
- &my_charset_utf8_general_ci);
-}
-
-
-bool mysql_xa_recover(THD *thd)
-{
- List<Item> field_list;
- Protocol *protocol= thd->protocol;
- MEM_ROOT *mem_root= thd->mem_root;
- my_hash_walk_action action;
- DBUG_ENTER("mysql_xa_recover");
-
- field_list.push_back(new (mem_root)
- Item_int(thd, "formatID", 0,
- MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
- field_list.push_back(new (mem_root)
- Item_int(thd, "gtrid_length", 0,
- MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
- field_list.push_back(new (mem_root)
- Item_int(thd, "bqual_length", 0,
- MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
- {
- uint len;
- CHARSET_INFO *cs;
-
- if (thd->lex->verbose)
- {
- len= SQL_XIDSIZE;
- cs= &my_charset_utf8_general_ci;
- action= (my_hash_walk_action) xa_recover_callback_verbose;
- }
- else
- {
- len= XIDDATASIZE;
- cs= &my_charset_bin;
- action= (my_hash_walk_action) xa_recover_callback_short;
- }
-
- field_list.push_back(new (mem_root)
- Item_empty_string(thd, "data", len, cs), mem_root);
- }
-
- if (protocol->send_result_set_metadata(&field_list,
- Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
- DBUG_RETURN(1);
-
- if (xid_cache_iterate(thd, action, protocol))
- DBUG_RETURN(1);
- my_eof(thd);
- DBUG_RETURN(0);
-}
/*
Called by engine to notify TC that a new commit checkpoint has been reached.
@@ -2381,11 +2368,26 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
{
int err;
handlerton *ht= ha_info->ht();
+#ifdef WITH_WSREP
+ if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION)
+ {
+ WSREP_DEBUG("ha_rollback_to_savepoint: run before_rollbackha_rollback_trans hook");
+ (void) wsrep_before_rollback(thd, !thd->in_sub_stmt);
+
+ }
+#endif // WITH_WSREP
if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt)))
{ // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
}
+#ifdef WITH_WSREP
+ if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION)
+ {
+ WSREP_DEBUG("ha_rollback_to_savepoint: run after_rollback hook");
+ (void) wsrep_after_rollback(thd, !thd->in_sub_stmt);
+ }
+#endif // WITH_WSREP
status_var_increment(thd->status_var.ha_rollback_count);
ha_info_next= ha_info->next();
ha_info->reset(); /* keep it conveniently zero-filled */
@@ -2402,6 +2404,16 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
*/
int ha_savepoint(THD *thd, SAVEPOINT *sv)
{
+#ifdef WITH_WSREP
+ /*
+ Register binlog hton for savepoint processing if wsrep binlog
+ emulation is on.
+ */
+ if (WSREP_EMULATE_BINLOG(thd) && wsrep_thd_is_local(thd))
+ {
+ wsrep_register_binlog_handler(thd, thd->in_multi_stmt_transaction_mode());
+ }
+#endif /* WITH_WSREP */
int error=0;
THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
&thd->transaction.all);
@@ -2628,7 +2640,7 @@ int ha_delete_table(THD *thd, handlerton *table_type, const char *path,
dummy_share.table_name= *alias;
dummy_table.alias.set(alias->str, alias->length, table_alias_charset);
file->change_table_ptr(&dummy_table, &dummy_share);
- file->print_error(error, MYF(intercept ? ME_JUST_WARNING : 0));
+ file->print_error(error, MYF(intercept ? ME_WARNING : 0));
}
if (intercept)
error= 0;
@@ -2686,23 +2698,28 @@ LEX_CSTRING *handler::engine_name()
}
+/*
+ It is assumed that the value of the parameter 'ranges' can be only 0 or 1.
+ If ranges == 1 then the function returns the cost of index only scan
+ by index 'keyno' of one range containing 'rows' key entries.
+ If ranges == 0 then the function returns only the cost of copying
+ those key entries into the engine buffers.
+*/
+
double handler::keyread_time(uint index, uint ranges, ha_rows rows)
{
- /*
- It is assumed that we will read trough the whole key range and that all
- key blocks are half full (normally things are much better). It is also
- assumed that each time we read the next key from the index, the handler
- performs a random seek, thus the cost is proportional to the number of
- blocks read. This model does not take into account clustered indexes -
- engines that support that (e.g. InnoDB) may want to overwrite this method.
- The model counts in the time to read index entries from cache.
- */
+ DBUG_ASSERT(ranges == 0 || ranges == 1);
size_t len= table->key_info[index].key_length + ref_length;
if (index == table->s->primary_key && table->file->primary_key_is_clustered())
len= table->s->stored_rec_length;
- double keys_per_block= (stats.block_size/2.0/len+1);
- return (rows + keys_per_block-1)/ keys_per_block +
- len*rows/(stats.block_size+1)/TIME_FOR_COMPARE ;
+ double cost= (double)rows*len/(stats.block_size+1)*IDX_BLOCK_COPY_COST;
+ if (ranges)
+ {
+ uint keys_per_block= (uint) (stats.block_size/2.0/len+1);
+ ulonglong blocks= !rows ? 0 : (rows-1) / keys_per_block + 1;
+ cost+= blocks;
+ }
+ return cost;
}
void **handler::ha_data(THD *thd) const
@@ -3687,6 +3704,8 @@ void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag)
}
else
{
+ if (key->algorithm == HA_KEY_ALG_LONG_HASH)
+ setup_keyinfo_hash(key);
/* Table is opened and defined at this point */
key_unpack(&str,table, key);
uint max_length=MYSQL_ERRMSG_SIZE-(uint) strlen(msg);
@@ -3697,6 +3716,8 @@ void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag)
}
my_printf_error(ER_DUP_ENTRY, msg, errflag, str.c_ptr_safe(),
key->name.str);
+ if (key->algorithm == HA_KEY_ALG_LONG_HASH)
+ re_setup_keyinfo_hash(key);
}
}
@@ -3714,7 +3735,6 @@ void print_keydup_error(TABLE *table, KEY *key, myf errflag)
errflag);
}
-
/**
Print error that we got from handler function.
@@ -3736,7 +3756,7 @@ void handler::print_error(int error, myf errflag)
if (ha_thd()->transaction_rollback_request)
{
/* Ensure this becomes a true error */
- errflag&= ~(ME_JUST_WARNING | ME_JUST_INFO);
+ errflag&= ~(ME_WARNING | ME_NOTE);
}
int textno= -1; // impossible value
@@ -3871,14 +3891,14 @@ void handler::print_error(int error, myf errflag)
{
textno=ER_RECORD_FILE_FULL;
/* Write the error message to error log */
- errflag|= ME_NOREFRESH;
+ errflag|= ME_ERROR_LOG;
break;
}
case HA_ERR_INDEX_FILE_FULL:
{
textno=ER_INDEX_FILE_FULL;
/* Write the error message to error log */
- errflag|= ME_NOREFRESH;
+ errflag|= ME_ERROR_LOG;
break;
}
case HA_ERR_LOCK_WAIT_TIMEOUT:
@@ -4005,14 +4025,14 @@ void handler::print_error(int error, myf errflag)
if (unlikely(fatal_error))
{
/* Ensure this becomes a true error */
- errflag&= ~(ME_JUST_WARNING | ME_JUST_INFO);
+ errflag&= ~(ME_WARNING | ME_NOTE);
if ((debug_assert_if_crashed_table ||
global_system_variables.log_warnings > 1))
{
/*
Log error to log before we crash or if extended warnings are requested
*/
- errflag|= ME_NOREFRESH;
+ errflag|= ME_ERROR_LOG;
}
}
@@ -4184,7 +4204,8 @@ static bool update_frm_version(TABLE *table)
int4store(version, MYSQL_VERSION_ID);
- if ((result= (int)mysql_file_pwrite(file, (uchar*) version, 4, 51L, MYF_RW)))
+ if ((result= (int)mysql_file_pwrite(file, (uchar*) version, 4, 51L,
+ MYF(MY_WME+MY_NABP))))
goto err;
table->s->mysql_version= MYSQL_VERSION_ID;
@@ -4203,9 +4224,10 @@ err:
*/
uint handler::get_dup_key(int error)
{
- DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
- m_lock_type != F_UNLCK);
+ DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type != F_UNLCK);
DBUG_ENTER("handler::get_dup_key");
+ if (table->s->long_unique_table && table->file->errkey < table->s->keys)
+ DBUG_RETURN(table->file->errkey);
table->file->errkey = (uint) -1;
if (error == HA_ERR_FOUND_DUPP_KEY ||
error == HA_ERR_FOREIGN_DUPLICATE_KEY ||
@@ -4622,7 +4644,7 @@ handler::check_if_supported_inplace_alter(TABLE *altered_table,
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
alter_table_operations inplace_offline_operations=
- ALTER_COLUMN_EQUAL_PACK_LENGTH |
+ ALTER_COLUMN_TYPE_CHANGE_BY_ENGINE |
ALTER_COLUMN_NAME |
ALTER_RENAME_COLUMN |
ALTER_CHANGE_COLUMN_DEFAULT |
@@ -4658,7 +4680,7 @@ handler::check_if_supported_inplace_alter(TABLE *altered_table,
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
uint table_changes= (ha_alter_info->handler_flags &
- ALTER_COLUMN_EQUAL_PACK_LENGTH) ?
+ ALTER_COLUMN_TYPE_CHANGE_BY_ENGINE) ?
IS_EQUAL_PACK_LENGTH : IS_EQUAL_YES;
if (table->file->check_if_incompatible_data(create_info, table_changes)
== COMPATIBLE_DATA_YES)
@@ -4667,6 +4689,30 @@ handler::check_if_supported_inplace_alter(TABLE *altered_table,
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
+Alter_inplace_info::Alter_inplace_info(HA_CREATE_INFO *create_info_arg,
+ Alter_info *alter_info_arg,
+ KEY *key_info_arg, uint key_count_arg,
+ partition_info *modified_part_info_arg,
+ bool ignore_arg, bool error_non_empty)
+ : create_info(create_info_arg),
+ alter_info(alter_info_arg),
+ key_info_buffer(key_info_arg),
+ key_count(key_count_arg),
+ index_drop_count(0),
+ index_drop_buffer(nullptr),
+ index_add_count(0),
+ index_add_buffer(nullptr),
+ rename_keys(current_thd->mem_root),
+ handler_ctx(nullptr),
+ group_commit_ctx(nullptr),
+ handler_flags(0),
+ modified_part_info(modified_part_info_arg),
+ ignore(ignore_arg),
+ online(false),
+ unsupported_reason(nullptr),
+ error_if_not_empty(error_non_empty)
+ {}
+
void Alter_inplace_info::report_unsupported_error(const char *not_supported,
const char *try_instead) const
{
@@ -4931,6 +4977,7 @@ void handler::get_dynamic_partition_info(PARTITION_STATS *stat_info,
stat_info->update_time= stats.update_time;
stat_info->check_time= stats.check_time;
stat_info->check_sum= stats.checksum;
+ stat_info->check_sum_null= stats.checksum_null;
}
@@ -5073,7 +5120,7 @@ int handler::calculate_checksum()
uchar null_mask= table->s->last_null_bit_pos
? 256 - (1 << table->s->last_null_bit_pos) : 0;
- table->use_all_columns();
+ table->use_all_stored_columns();
stats.checksum= 0;
if ((error= ha_rnd_init(1)))
@@ -5085,7 +5132,7 @@ int handler::calculate_checksum()
return HA_ERR_ABORTED_BY_USER;
ha_checksum row_crc= 0;
- error= table->file->ha_rnd_next(table->record[0]);
+ error= ha_rnd_next(table->record[0]);
if (error)
break;
@@ -5139,7 +5186,7 @@ int handler::calculate_checksum()
stats.checksum+= row_crc;
}
- table->file->ha_rnd_end();
+ ha_rnd_end();
return error == HA_ERR_END_OF_FILE ? 0 : error;
}
@@ -5209,7 +5256,7 @@ int ha_create_table(THD *thd, const char *path,
{
if (!thd->is_error())
my_error(ER_CANT_CREATE_TABLE, MYF(0), db, table_name, error);
- table.file->print_error(error, MYF(ME_JUST_WARNING));
+ table.file->print_error(error, MYF(ME_WARNING));
PSI_CALL_drop_table_share(temp_table, share.db.str, (uint)share.db.length,
share.table_name.str, (uint)share.table_name.length);
}
@@ -5970,27 +6017,75 @@ int handler::compare_key2(key_range *range) const
/**
ICP callback - to be called by an engine to check the pushed condition
*/
-extern "C" enum icp_result handler_index_cond_check(void* h_arg)
+extern "C" check_result_t handler_index_cond_check(void* h_arg)
{
handler *h= (handler*)h_arg;
THD *thd= h->table->in_use;
- enum icp_result res;
+ check_result_t res;
DEBUG_SYNC(thd, "handler_index_cond_check");
enum thd_kill_levels abort_at= h->has_transactions() ?
THD_ABORT_SOFTLY : THD_ABORT_ASAP;
if (thd_kill_level(thd) > abort_at)
- return ICP_ABORTED_BY_USER;
+ return CHECK_ABORTED_BY_USER;
if (h->end_range && h->compare_key2(h->end_range) > 0)
- return ICP_OUT_OF_RANGE;
+ return CHECK_OUT_OF_RANGE;
h->increment_statistics(&SSV::ha_icp_attempts);
- if ((res= h->pushed_idx_cond->val_int()? ICP_MATCH : ICP_NO_MATCH) ==
- ICP_MATCH)
+ if ((res= h->pushed_idx_cond->val_int()? CHECK_POS : CHECK_NEG) ==
+ CHECK_POS)
h->increment_statistics(&SSV::ha_icp_match);
return res;
}
+
+/**
+ Rowid filter callback - to be called by an engine to check rowid / primary
+ keys of the rows whose data is to be fetched against the used rowid filter
+*/
+
+extern "C"
+check_result_t handler_rowid_filter_check(void *h_arg)
+{
+ handler *h= (handler*) h_arg;
+ TABLE *tab= h->get_table();
+
+ /*
+ Check for out-of-range and killed conditions only if we haven't done it
+ already in the pushed index condition check
+ */
+ if (!h->pushed_idx_cond)
+ {
+ THD *thd= h->table->in_use;
+ DEBUG_SYNC(thd, "handler_rowid_filter_check");
+ enum thd_kill_levels abort_at= h->has_transactions() ?
+ THD_ABORT_SOFTLY : THD_ABORT_ASAP;
+ if (thd_kill_level(thd) > abort_at)
+ return CHECK_ABORTED_BY_USER;
+
+ if (h->end_range && h->compare_key2(h->end_range) > 0)
+ return CHECK_OUT_OF_RANGE;
+ }
+
+ h->position(tab->record[0]);
+ return h->pushed_rowid_filter->check((char*)h->ref)? CHECK_POS: CHECK_NEG;
+}
+
+
+/**
+ Callback function for an engine to check whether the used rowid filter
+ has been already built
+*/
+
+extern "C" int handler_rowid_filter_is_active(void *h_arg)
+{
+ if (!h_arg)
+ return false;
+ handler *h= (handler*) h_arg;
+ return h->rowid_filter_is_active;
+}
+
+
int handler::index_read_idx_map(uchar * buf, uint index, const uchar * key,
key_part_map keypart_map,
enum ha_rkey_function find_flag)
@@ -6171,6 +6266,12 @@ bool handler::check_table_binlog_row_based(bool binlog_row)
return false;
if (unlikely((table->in_use->variables.sql_log_bin_off)))
return 0; /* Called by partitioning engine */
+#ifdef WITH_WSREP
+ if (!table->in_use->variables.sql_log_bin &&
+ wsrep_thd_is_applying(table->in_use))
+ return 0; /* wsrep patch sets sql_log_bin to silence binlogging
+ from high priority threads */
+#endif /* WITH_WSREP */
if (unlikely((!check_table_binlog_row_based_done)))
{
check_table_binlog_row_based_done= 1;
@@ -6201,12 +6302,12 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
Otherwise, return 'true' if binary logging is on.
*/
IF_WSREP(((WSREP_EMULATE_BINLOG(thd) &&
- (thd->wsrep_exec_mode != REPL_RECV)) ||
+ wsrep_thd_is_local(thd)) ||
((WSREP(thd) ||
(thd->variables.option_bits & OPTION_BIN_LOG)) &&
mysql_bin_log.is_open())),
- (thd->variables.option_bits & OPTION_BIN_LOG) &&
- mysql_bin_log.is_open()));
+ (thd->variables.option_bits & OPTION_BIN_LOG) &&
+ mysql_bin_log.is_open()));
}
@@ -6243,7 +6344,9 @@ static int write_locked_table_maps(THD *thd)
MYSQL_LOCK *locks[2];
locks[0]= thd->extra_lock;
locks[1]= thd->lock;
- my_bool with_annotate= thd->variables.binlog_annotate_row_events &&
+ my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd),
+ true) &&
+ thd->variables.binlog_annotate_row_events &&
thd->query() && thd->query_length();
for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
@@ -6331,23 +6434,9 @@ int binlog_log_row(TABLE* table, const uchar *before_record,
/* only InnoDB tables will be replicated through binlog emulation */
if ((WSREP_EMULATE_BINLOG(thd) &&
- table->file->partition_ht()->db_type != DB_TYPE_INNODB) ||
- (thd->wsrep_ignore_table == true))
+ !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
+ thd->wsrep_ignore_table == true)
return 0;
-
- /* enforce wsrep_max_ws_rows */
- if (WSREP(thd) && table->s->tmp_table == NO_TMP_TABLE)
- {
- thd->wsrep_affected_rows++;
- if (wsrep_max_ws_rows &&
- thd->wsrep_exec_mode != REPL_RECV &&
- thd->wsrep_affected_rows > wsrep_max_ws_rows)
- {
- trans_rollback_stmt(thd) || trans_rollback(thd);
- my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
- return ER_ERROR_DURING_COMMIT;
- }
- }
#endif
if (!table->file->check_table_binlog_row_based(1))
@@ -6455,12 +6544,194 @@ int handler::ha_reset()
/* Reset information about pushed engine conditions */
cancel_pushed_idx_cond();
/* Reset information about pushed index conditions */
+ cancel_pushed_rowid_filter();
clear_top_table_fields();
DBUG_RETURN(reset());
}
+#ifdef WITH_WSREP
+static int wsrep_after_row(THD *thd)
+{
+ DBUG_ENTER("wsrep_after_row");
+ /* enforce wsrep_max_ws_rows */
+ thd->wsrep_affected_rows++;
+ if (wsrep_max_ws_rows &&
+ wsrep_thd_is_local(thd) &&
+ thd->wsrep_affected_rows > wsrep_max_ws_rows)
+ {
+ trans_rollback_stmt(thd) || trans_rollback(thd);
+ my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
+ DBUG_RETURN(ER_ERROR_DURING_COMMIT);
+ }
+ else if (wsrep_after_row(thd, false))
+ {
+ DBUG_RETURN(ER_LOCK_DEADLOCK);
+ }
+ DBUG_RETURN(0);
+}
+#endif /* WITH_WSREP */
+
+static int check_duplicate_long_entry_key(TABLE *table, handler *h,
+ const uchar *new_rec, uint key_no)
+{
+ Field *hash_field;
+ int result, error= 0;
+ KEY *key_info= table->key_info + key_no;
+ hash_field= key_info->key_part->field;
+ uchar ptr[HA_HASH_KEY_LENGTH_WITH_NULL];
+
+ DBUG_ASSERT((key_info->flags & HA_NULL_PART_KEY &&
+ key_info->key_length == HA_HASH_KEY_LENGTH_WITH_NULL)
+ || key_info->key_length == HA_HASH_KEY_LENGTH_WITHOUT_NULL);
+
+ if (hash_field->is_real_null())
+ return 0;
+
+ key_copy(ptr, new_rec, key_info, key_info->key_length, false);
+
+ if (!table->check_unique_buf)
+ table->check_unique_buf= (uchar *)alloc_root(&table->mem_root,
+ table->s->reclength);
+
+ result= h->ha_index_init(key_no, 0);
+ if (result)
+ return result;
+ store_record(table, check_unique_buf);
+ result= h->ha_index_read_map(table->record[0],
+ ptr, HA_WHOLE_KEY, HA_READ_KEY_EXACT);
+ if (!result)
+ {
+ bool is_same;
+ Field * t_field;
+ Item_func_hash * temp= (Item_func_hash *)hash_field->vcol_info->expr;
+ Item ** arguments= temp->arguments();
+ uint arg_count= temp->argument_count();
+ do
+ {
+ my_ptrdiff_t diff= table->check_unique_buf - new_rec;
+ is_same= true;
+ for (uint j=0; is_same && j < arg_count; j++)
+ {
+ DBUG_ASSERT(arguments[j]->type() == Item::FIELD_ITEM ||
+ // this one for left(fld_name,length)
+ arguments[j]->type() == Item::FUNC_ITEM);
+ if (arguments[j]->type() == Item::FIELD_ITEM)
+ {
+ t_field= static_cast<Item_field *>(arguments[j])->field;
+ if (t_field->cmp_offset(diff))
+ is_same= false;
+ }
+ else
+ {
+ Item_func_left *fnc= static_cast<Item_func_left *>(arguments[j]);
+ DBUG_ASSERT(!my_strcasecmp(system_charset_info, "left", fnc->func_name()));
+ DBUG_ASSERT(fnc->arguments()[0]->type() == Item::FIELD_ITEM);
+ t_field= static_cast<Item_field *>(fnc->arguments()[0])->field;
+ uint length= (uint)fnc->arguments()[1]->val_int();
+ if (t_field->cmp_prefix(t_field->ptr, t_field->ptr + diff, length))
+ is_same= false;
+ }
+ }
+ }
+ while (!is_same && !(result= h->ha_index_next_same(table->record[0],
+ ptr, key_info->key_length)));
+ if (is_same)
+ error= HA_ERR_FOUND_DUPP_KEY;
+ goto exit;
+ }
+ if (result != HA_ERR_KEY_NOT_FOUND)
+ error= result;
+exit:
+ if (error == HA_ERR_FOUND_DUPP_KEY)
+ {
+ table->file->errkey= key_no;
+ if (h->ha_table_flags() & HA_DUPLICATE_POS)
+ {
+ h->position(table->record[0]);
+ memcpy(table->file->dup_ref, h->ref, h->ref_length);
+ }
+ }
+ restore_record(table, check_unique_buf);
+ h->ha_index_end();
+ return error;
+}
+
+/** @brief
+ check whether inserted records breaks the
+ unique constraint on long columns.
+ @returns 0 if no duplicate else returns error
+ */
+static int check_duplicate_long_entries(TABLE *table, handler *h,
+ const uchar *new_rec)
+{
+ table->file->errkey= -1;
+ int result;
+ for (uint i= 0; i < table->s->keys; i++)
+ {
+ if (table->key_info[i].algorithm == HA_KEY_ALG_LONG_HASH &&
+ (result= check_duplicate_long_entry_key(table, h, new_rec, i)))
+ return result;
+ }
+ return 0;
+}
+
+/** @brief
+ check whether updated records breaks the
+ unique constraint on long columns.
+ In the case of update we just need to check the specic key
+ reason for that is consider case
+ create table t1(a blob , b blob , x blob , y blob ,unique(a,b)
+ ,unique(x,y))
+ and update statement like this
+ update t1 set a=23+a; in this case if we try to scan for
+ whole keys in table then index scan on x_y will return 0
+ because data is same so in the case of update we take
+ key as a parameter in normal insert key should be -1
+ @returns 0 if no duplicate else returns error
+ */
+static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar *new_rec)
+{
+ Field *field;
+ uint key_parts;
+ int error= 0;
+ KEY *keyinfo;
+ KEY_PART_INFO *keypart;
+ /*
+ Here we are comparing whether new record and old record are same
+ with respect to fields in hash_str
+ */
+ uint reclength= (uint) (table->record[1] - table->record[0]);
+ table->clone_handler_for_update();
+ for (uint i= 0; i < table->s->keys; i++)
+ {
+ keyinfo= table->key_info + i;
+ if (keyinfo->algorithm == HA_KEY_ALG_LONG_HASH)
+ {
+ key_parts= fields_in_hash_keyinfo(keyinfo);
+ keypart= keyinfo->key_part - key_parts;
+ for (uint j= 0; j < key_parts; j++, keypart++)
+ {
+ field= keypart->field;
+ /* Compare fields if they are different then check for duplicates*/
+ if(field->cmp_binary_offset(reclength))
+ {
+ if((error= check_duplicate_long_entry_key(table, table->update_handler,
+ new_rec, i)))
+ goto exit;
+ /*
+ break because check_duplicate_long_entries_key will
+ take care of remaining fields
+ */
+ break;
+ }
+ }
+ }
+ }
+ exit:
+ return error;
+}
-int handler::ha_write_row(uchar *buf)
+int handler::ha_write_row(const uchar *buf)
{
int error;
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
@@ -6473,6 +6744,14 @@ int handler::ha_write_row(uchar *buf)
mark_trx_read_write();
increment_statistics(&SSV::ha_write_count);
+ if (table->s->long_unique_table)
+ {
+ if (this->inited == RND)
+ table->clone_handler_for_update();
+ handler *h= table->update_handler ? table->update_handler : table->file;
+ if ((error= check_duplicate_long_entries(table, h, buf)))
+ DBUG_RETURN(error);
+ }
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
{ error= write_row(buf); })
@@ -6481,7 +6760,15 @@ int handler::ha_write_row(uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, 0, buf, log_func);
+#ifdef WITH_WSREP
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ {
+ DBUG_RETURN(error);
+ }
+#endif /* WITH_WSREP */
}
+
DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(error);
}
@@ -6504,6 +6791,11 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();
increment_statistics(&SSV::ha_update_count);
+ if (table->s->long_unique_table &&
+ (error= check_duplicate_long_entries_update(table, table->file, (uchar *)new_data)))
+ {
+ return error;
+ }
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
@@ -6513,6 +6805,13 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
{
rows_changed++;
error= binlog_log_row(table, old_data, new_data, log_func);
+#ifdef WITH_WSREP
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ {
+ return error;
+ }
+#endif /* WITH_WSREP */
}
return error;
}
@@ -6521,7 +6820,7 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
Update first row. Only used by sequence tables
*/
-int handler::update_first_row(uchar *new_data)
+int handler::update_first_row(const uchar *new_data)
{
int error;
if (likely(!(error= ha_rnd_init(1))))
@@ -6568,6 +6867,13 @@ int handler::ha_delete_row(const uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, buf, 0, log_func);
+#ifdef WITH_WSREP
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ {
+ return error;
+ }
+#endif /* WITH_WSREP */
}
return error;
}
@@ -6587,14 +6893,14 @@ int handler::ha_delete_row(const uchar *buf)
@retval != 0 Failure.
*/
-int handler::ha_direct_update_rows(ha_rows *update_rows)
+int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
{
int error;
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();
- error = direct_update_rows(update_rows);
+ error = direct_update_rows(update_rows, found_rows);
MYSQL_UPDATE_ROW_DONE(error);
return error;
}
@@ -6727,6 +7033,20 @@ void handler::set_lock_type(enum thr_lock_type lock)
table->reginfo.lock_type= lock;
}
+Compare_keys handler::compare_key_parts(const Field &old_field,
+ const Column_definition &new_field,
+ const KEY_PART_INFO &old_part,
+ const KEY_PART_INFO &new_part) const
+{
+ if (!old_field.is_equal(new_field))
+ return Compare_keys::NotEqual;
+
+ if (old_part.length != new_part.length)
+ return Compare_keys::NotEqual;
+
+ return Compare_keys::Equal;
+}
+
#ifdef WITH_WSREP
/**
@details
@@ -6757,7 +7077,7 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
DBUG_ENTER("ha_abort_transaction");
if (!WSREP(bf_thd) &&
!(bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU &&
- bf_thd->wsrep_exec_mode == TOTAL_ORDER)) {
+ wsrep_thd_is_toi(bf_thd))) {
DBUG_RETURN(0);
}
@@ -6773,54 +7093,6 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
DBUG_RETURN(0);
}
-
-void ha_fake_trx_id(THD *thd)
-{
- DBUG_ENTER("ha_fake_trx_id");
-
- bool no_fake_trx_id= true;
-
- if (!WSREP(thd))
- {
- DBUG_VOID_RETURN;
- }
-
- if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID)
- {
- WSREP_DEBUG("fake trx id skipped: %" PRIu64, thd->wsrep_ws_handle.trx_id);
- DBUG_VOID_RETURN;
- }
-
- /* Try statement transaction if standard one is not set. */
- THD_TRANS *trans= (thd->transaction.all.ha_list) ? &thd->transaction.all :
- &thd->transaction.stmt;
-
- Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
-
- for (; ha_info; ha_info= ha_info_next)
- {
- handlerton *hton= ha_info->ht();
- if (hton->fake_trx_id)
- {
- hton->fake_trx_id(hton, thd);
-
- /* Got a fake trx id. */
- no_fake_trx_id= false;
-
- /*
- We need transaction ID from just one storage engine providing
- fake_trx_id (which will most likely be the case).
- */
- break;
- }
- ha_info_next= ha_info->next();
- }
-
- if (unlikely(no_fake_trx_id))
- WSREP_WARN("Cannot get fake transaction ID from storage engine.");
-
- DBUG_VOID_RETURN;
-}
#endif /* WITH_WSREP */
@@ -7108,6 +7380,10 @@ int del_global_index_stat(THD *thd, TABLE* table, KEY* key_info)
DBUG_RETURN(res);
}
+/*****************************************************************************
+ VERSIONING functions
+******************************************************************************/
+
bool Vers_parse_info::is_start(const char *name) const
{
DBUG_ASSERT(name);
@@ -7181,8 +7457,8 @@ bool Vers_parse_info::fix_implicit(THD *thd, Alter_info *alter_info)
alter_info->flags|= ALTER_PARSER_ADD_COLUMN;
- system_time= start_end_t(default_start, default_end);
- as_row= system_time;
+ period= start_end_t(default_start, default_end);
+ as_row= period;
if (vers_create_sys_field(thd, default_start, alter_info, VERS_SYS_START_FLAG) ||
vers_create_sys_field(thd, default_end, alter_info, VERS_SYS_END_FLAG))
@@ -7301,7 +7577,7 @@ bool Vers_parse_info::fix_alter_info(THD *thd, Alter_info *alter_info,
if (DBUG_EVALUATE_IF("sysvers_force", 0, share->tmp_table))
{
- my_error(ER_VERS_TEMPORARY, MYF(0));
+ my_error(ER_VERS_NOT_SUPPORTED, MYF(0), "CREATE TEMPORARY TABLE");
return true;
}
@@ -7371,7 +7647,7 @@ bool Vers_parse_info::fix_alter_info(THD *thd, Alter_info *alter_info,
DBUG_ASSERT(end.str);
as_row= start_end_t(start, end);
- system_time= as_row;
+ period= as_row;
if (alter_info->create_list.elements)
{
@@ -7466,7 +7742,7 @@ Vers_parse_info::fix_create_like(Alter_info &alter_info, HA_CREATE_INFO &create_
}
as_row= start_end_t(f_start->field_name, f_end->field_name);
- system_time= as_row;
+ period= as_row;
create_info.options|= HA_VERSIONED_TABLE;
return false;
@@ -7491,14 +7767,14 @@ bool Vers_parse_info::check_conditions(const Lex_table_name &table_name,
return true;
}
- if (!system_time.start || !system_time.end)
+ if (!period.start || !period.end)
{
my_error(ER_MISSING, MYF(0), table_name.str, "PERIOD FOR SYSTEM_TIME");
return true;
}
- if (!as_row.start.streq(system_time.start) ||
- !as_row.end.streq(system_time.end))
+ if (!as_row.start.streq(period.start) ||
+ !as_row.end.streq(period.end))
{
my_error(ER_VERS_PERIOD_COLUMNS, MYF(0), as_row.start.str, as_row.end.str);
return true;
@@ -7592,3 +7868,109 @@ bool Vers_parse_info::check_sys_fields(const Lex_table_name &table_name,
return false;
}
+
+bool Table_period_info::check_field(const Create_field* f,
+ const Lex_ident& f_name) const
+{
+ bool res= false;
+ if (!f)
+ {
+ my_error(ER_BAD_FIELD_ERROR, MYF(0), f_name.str, name.str);
+ res= true;
+ }
+ else if (f->type_handler()->mysql_timestamp_type() != MYSQL_TIMESTAMP_DATE &&
+ f->type_handler()->mysql_timestamp_type() != MYSQL_TIMESTAMP_DATETIME)
+ {
+ my_error(ER_WRONG_FIELD_SPEC, MYF(0), f->field_name.str);
+ res= true;
+ }
+ else if (f->vcol_info || f->flags & VERS_SYSTEM_FIELD)
+ {
+ my_error(ER_PERIOD_FIELD_WRONG_ATTRIBUTES, MYF(0),
+ f->field_name.str, "GENERATED ALWAYS AS");
+ res= true;
+ }
+
+ return res;
+}
+
+bool Table_scope_and_contents_source_st::check_fields(
+ THD *thd, Alter_info *alter_info,
+ const Lex_table_name &table_name, const Lex_table_name &db, int select_count)
+{
+ return vers_check_system_fields(thd, alter_info,
+ table_name, db, select_count) ||
+ check_period_fields(thd, alter_info);
+}
+
+bool Table_scope_and_contents_source_st::check_period_fields(
+ THD *thd, Alter_info *alter_info)
+{
+ if (!period_info.name)
+ return false;
+
+ if (tmp_table())
+ {
+ my_error(ER_PERIOD_TEMPORARY_NOT_ALLOWED, MYF(0));
+ return true;
+ }
+
+ Table_period_info::start_end_t &period= period_info.period;
+ const Create_field *row_start= NULL;
+ const Create_field *row_end= NULL;
+ List_iterator<Create_field> it(alter_info->create_list);
+ while (const Create_field *f= it++)
+ {
+ if (period.start.streq(f->field_name)) row_start= f;
+ else if (period.end.streq(f->field_name)) row_end= f;
+
+ if (period_info.name.streq(f->field_name))
+ {
+ my_error(ER_DUP_FIELDNAME, MYF(0), f->field_name.str);
+ return true;
+ }
+ }
+
+ bool res= period_info.check_field(row_start, period.start.str)
+ || period_info.check_field(row_end, period.end.str);
+ if (res)
+ return true;
+
+ if (row_start->type_handler() != row_end->type_handler()
+ || row_start->length != row_end->length)
+ {
+ my_error(ER_PERIOD_TYPES_MISMATCH, MYF(0), period_info.name.str);
+ res= true;
+ }
+
+ return res;
+}
+
+bool
+Table_scope_and_contents_source_st::fix_create_fields(THD *thd,
+ Alter_info *alter_info,
+ const TABLE_LIST &create_table)
+{
+ return vers_fix_system_fields(thd, alter_info, create_table)
+ || fix_period_fields(thd, alter_info);
+}
+
+bool
+Table_scope_and_contents_source_st::fix_period_fields(THD *thd,
+ Alter_info *alter_info)
+{
+ if (!period_info.name)
+ return false;
+
+ Table_period_info::start_end_t &period= period_info.period;
+ List_iterator<Create_field> it(alter_info->create_list);
+ while (Create_field *f= it++)
+ {
+ if (period.start.streq(f->field_name) || period.end.streq(f->field_name))
+ {
+ f->period= &period_info;
+ f->flags|= NOT_NULL_FLAG;
+ }
+ }
+ return false;
+}