summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/innobase/dict/dict0dict.cc21
-rw-r--r--storage/innobase/handler/ha_innodb.cc1372
-rw-r--r--storage/innobase/handler/ha_innodb.h42
-rw-r--r--storage/innobase/handler/handler0alter.cc4
-rw-r--r--storage/innobase/include/dict0mem.h3
-rw-r--r--storage/innobase/include/ha_prototypes.h13
-rw-r--r--storage/innobase/include/lock0lock.h14
-rw-r--r--storage/innobase/include/rem0rec.h9
-rw-r--r--storage/innobase/include/srv0srv.h12
-rw-r--r--storage/innobase/include/sync0sync.ic3
-rw-r--r--storage/innobase/include/trx0sys.h33
-rw-r--r--storage/innobase/include/trx0sys.ic3
-rw-r--r--storage/innobase/include/trx0trx.h3
-rw-r--r--storage/innobase/lock/lock0lock.cc327
-rw-r--r--storage/innobase/rem/rem0rec.cc133
-rw-r--r--storage/innobase/row/row0ins.cc52
-rw-r--r--storage/innobase/row/row0upd.cc288
-rw-r--r--storage/innobase/srv/srv0conc.cc93
-rw-r--r--storage/innobase/srv/srv0srv.cc71
-rw-r--r--storage/innobase/trx/trx0roll.cc16
-rw-r--r--storage/innobase/trx/trx0sys.cc129
-rw-r--r--storage/innobase/trx/trx0trx.cc38
-rw-r--r--storage/xtradb/dict/dict0dict.cc24
-rw-r--r--storage/xtradb/handler/ha_innodb.cc1371
-rw-r--r--storage/xtradb/handler/ha_innodb.h36
-rw-r--r--storage/xtradb/handler/handler0alter.cc4
-rw-r--r--storage/xtradb/include/dict0mem.h3
-rw-r--r--storage/xtradb/include/ha_prototypes.h15
-rw-r--r--storage/xtradb/include/lock0lock.h4
-rw-r--r--storage/xtradb/include/rem0rec.h9
-rw-r--r--storage/xtradb/include/srv0srv.h9
-rw-r--r--storage/xtradb/include/trx0sys.h35
-rw-r--r--storage/xtradb/include/trx0trx.h3
-rw-r--r--storage/xtradb/lock/lock0lock.cc256
-rw-r--r--storage/xtradb/os/os0file.cc8
-rw-r--r--storage/xtradb/rem/rem0rec.cc138
-rw-r--r--storage/xtradb/row/row0ins.cc52
-rw-r--r--storage/xtradb/row/row0upd.cc234
-rw-r--r--storage/xtradb/srv/srv0conc.cc33
-rw-r--r--storage/xtradb/srv/srv0srv.cc32
-rw-r--r--storage/xtradb/trx/trx0roll.cc16
-rw-r--r--storage/xtradb/trx/trx0sys.cc93
-rw-r--r--storage/xtradb/trx/trx0trx.cc43
43 files changed, 5045 insertions, 52 deletions
diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc
index a560dc54eac..5bcc3c58802 100644
--- a/storage/innobase/dict/dict0dict.cc
+++ b/storage/innobase/dict/dict0dict.cc
@@ -3158,7 +3158,26 @@ next_rec:
return(NULL);
}
-
+#ifdef WITH_WSREP
+dict_index_t*
+wsrep_dict_foreign_find_index(
+/*====================*/
+ dict_table_t* table, /*!< in: table */
+ const char** columns,/*!< in: array of column names */
+ ulint n_cols, /*!< in: number of columns */
+ dict_index_t* types_idx, /*!< in: NULL or an index to whose types the
+ column types must match */
+ ibool check_charsets,
+ /*!< in: whether to check charsets.
+ only has an effect if types_idx != NULL */
+ ulint check_null)
+ /*!< in: nonzero if none of the columns must
+ be declared NOT NULL */
+{
+ return dict_foreign_find_index(
+ table, columns, n_cols, types_idx, check_charsets, check_null);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Report an error in a foreign key definition. */
static
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index ee7ea4246f9..f776ba7d96b 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -85,6 +85,9 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include "ha_prototypes.h"
#include "ut0mem.h"
#include "ibuf0ibuf.h"
+#ifdef WITH_WSREP
+#include "../storage/innobase/include/ut0byte.h"
+#endif /* WITH_WSREP */
#include "dict0dict.h"
#include "srv0mon.h"
#include "api0api.h"
@@ -114,6 +117,45 @@ this program; if not, write to the Free Software Foundation, Inc.,
# define MYSQL_PLUGIN_IMPORT /* nothing */
# endif /* MYSQL_PLUGIN_IMPORT */
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+#include <my_md5.h>
+#if defined(HAVE_YASSL)
+#include "my_config.h"
+#include "md5.hpp"
+#elif defined(HAVE_OPENSSL)
+#include <openssl/md5.h>
+#endif
+
+
+#include "dict0priv.h"
+
+extern my_bool wsrep_certify_nonPK;
+class binlog_trx_data;
+extern handlerton *binlog_hton;
+
+extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
+extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd;
+
+static inline wsrep_ws_handle_t*
+wsrep_ws_handle(THD* thd, const trx_t* trx) {
+ return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
+ (wsrep_trx_id_t)trx->id);
+}
+
+extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_buf_t* key,
+ size_t* key_len);
+
+extern handlerton * wsrep_hton;
+extern TC_LOG* tc_log;
+extern void wsrep_cleanup_transaction(THD *thd);
+#endif /* WITH_WSREP */
/** to protect innobase_open_files */
static mysql_mutex_t innobase_share_mutex;
/** to force correct commit order in binlog */
@@ -1151,6 +1193,10 @@ innobase_srv_conc_enter_innodb(
/*===========================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd)) return;
+#endif /* WITH_WSREP */
if (srv_thread_concurrency) {
if (trx->n_tickets_to_enter_innodb > 0) {
@@ -1185,6 +1231,10 @@ innobase_srv_conc_exit_innodb(
#ifdef UNIV_SYNC_DEBUG
ut_ad(!sync_thread_levels_nonempty_trx(trx->has_search_latch));
#endif /* UNIV_SYNC_DEBUG */
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd)) return;
+#endif /* WITH_WSREP */
/* This is to avoid making an unnecessary function call. */
if (trx->declared_to_be_inside_innodb
@@ -1305,6 +1355,15 @@ thd_to_trx(
{
return(*(trx_t**) thd_ha_data(thd, innodb_hton_ptr));
}
+#ifdef WITH_WSREP
+ulonglong
+thd_to_trx_id(
+/*=======*/
+ THD* thd) /*!< in: MySQL thread */
+{
+ return(thd_to_trx(thd)->id);
+}
+#endif
/********************************************************************//**
Call this function when mysqld passes control to the client. That is to
@@ -1334,6 +1393,15 @@ innobase_release_temporary_latches(
return(0);
}
+#ifdef WITH_WSREP
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal);
+static void
+wsrep_fake_trx_id(handlerton* hton, THD *thd);
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid);
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid);
+#endif
/********************************************************************//**
Increments innobase_active_counter and every INNOBASE_WAKE_INTERVALth
time calls srv_active_wake_master_thread. This function should be used
@@ -1753,6 +1821,9 @@ int
innobase_mysql_tmpfile(void)
/*========================*/
{
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ os_event_wait(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
int fd2 = -1;
File fd = mysql_tmpfile("ib");
@@ -2807,6 +2878,12 @@ innobase_init(
innobase_hton->release_temporary_latches =
innobase_release_temporary_latches;
+#ifdef WITH_WSREP
+ innobase_hton->wsrep_abort_transaction=wsrep_abort_transaction;
+ innobase_hton->wsrep_set_checkpoint=innobase_wsrep_set_checkpoint;
+ innobase_hton->wsrep_get_checkpoint=innobase_wsrep_get_checkpoint;
+ innobase_hton->wsrep_fake_trx_id=wsrep_fake_trx_id;
+#endif /* WITH_WSREP */
innobase_hton->kill_query = innobase_kill_query;
if (srv_file_per_table)
@@ -3399,10 +3476,30 @@ innobase_commit_low(
/*================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ THD* thd = (THD*)trx->mysql_thd;
+ const char* tmp = 0;
+ if (wsrep_on((void*)thd)) {
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1,
+ "innobase_commit_low():trx_commit_for_mysql(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ tmp = thd_proc_info(thd, info);
+
+#else
+ tmp = thd_proc_info(thd, "innobase_commit_low()");
+#endif /* WSREP_PROC_INFO */
+ }
+#endif /* WITH_WSREP */
if (trx_is_started(trx)) {
trx_commit_for_mysql(trx);
}
+#ifdef WITH_WSREP
+ if (wsrep_on((void*)thd)) { thd_proc_info(thd, tmp); }
+#endif /* WITH_WSREP */
}
/*****************************************************************//**
@@ -4095,6 +4192,20 @@ innobase_kill_query(
DBUG_ENTER("innobase_kill_query");
DBUG_ASSERT(hton == innodb_hton_ptr);
+#ifdef WITH_WSREP
+ wsrep_thd_LOCK(thd);
+ if (wsrep_thd_conflict_state(thd) != NO_CONFLICT) {
+ /* if victim has been signaled by BF thread and/or aborting
+ is already progressing, following query aborting is not necessary
+ any more.
+ Also, BF thread should own trx mutex for the victim, which would
+ conflict with trx_mutex_enter() below
+ */
+ wsrep_thd_UNLOCK(thd);
+ DBUG_VOID_RETURN;
+ }
+ wsrep_thd_UNLOCK(thd);
+#endif /* WITH_WSREP */
trx = thd_to_trx(thd);
if (trx)
@@ -4262,7 +4373,11 @@ ha_innobase::max_supported_key_length() const
case 8192:
return(1536);
default:
+#ifdef WITH_WSREP
+ return(3500);
+#else
return(3500);
+#endif
}
}
@@ -5373,7 +5488,101 @@ get_field_offset(
{
return((uint) (field->ptr - table->record[0]));
}
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_innobase_mysql_sort(
+/*===============*/
+ /* out: str contains sort string */
+ int mysql_type, /* in: MySQL type */
+ uint charset_number, /* in: number of the charset */
+ unsigned char* str, /* in: data field */
+ unsigned int str_length) /* in: data field length,
+ not UNIV_SQL_NULL */
+{
+ CHARSET_INFO* charset;
+ enum_field_types mysql_tp;
+
+ DBUG_ASSERT(str_length != UNIV_SQL_NULL);
+
+ mysql_tp = (enum_field_types) mysql_type;
+
+ switch (mysql_tp) {
+
+ case MYSQL_TYPE_BIT:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_VARCHAR:
+ {
+ uchar tmp_str[REC_VERSION_56_MAX_INDEX_COL_LEN];
+ uint tmp_length = REC_VERSION_56_MAX_INDEX_COL_LEN;
+
+ /* Use the charset number to pick the right charset struct for
+ the comparison. Since the MySQL function get_charset may be
+ slow before Bar removes the mutex operation there, we first
+ look at 2 common charsets directly. */
+
+ if (charset_number == default_charset_info->number) {
+ charset = default_charset_info;
+ } else if (charset_number == my_charset_latin1.number) {
+ charset = &my_charset_latin1;
+ } else {
+ charset = get_charset(charset_number, MYF(MY_WME));
+
+ if (charset == NULL) {
+ sql_print_error("InnoDB needs charset %lu for doing "
+ "a comparison, but MySQL cannot "
+ "find that charset.",
+ (ulong) charset_number);
+ ut_a(0);
+ }
+ }
+
+ ut_a(str_length <= tmp_length);
+ memcpy(tmp_str, str, str_length);
+
+ //tmp_length = charset->coll->strnxfrm(charset, str, str_length,
+ // tmp_str, str_length);
+ /* Note: in MySQL 5.6:
+ */
+ tmp_length = charset->coll->strnxfrm(charset, str, str_length,
+ str_length, tmp_str, tmp_length, 0);
+ /**/
+ DBUG_ASSERT(tmp_length == str_length);
+
+ break;
+ }
+ case MYSQL_TYPE_DECIMAL :
+ case MYSQL_TYPE_TINY :
+ case MYSQL_TYPE_SHORT :
+ case MYSQL_TYPE_LONG :
+ case MYSQL_TYPE_FLOAT :
+ case MYSQL_TYPE_DOUBLE :
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_TIMESTAMP :
+ case MYSQL_TYPE_LONGLONG :
+ case MYSQL_TYPE_INT24 :
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_DATETIME :
+ case MYSQL_TYPE_YEAR :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_NEWDECIMAL :
+ case MYSQL_TYPE_ENUM :
+ case MYSQL_TYPE_SET :
+ case MYSQL_TYPE_GEOMETRY :
+ break;
+ default:
+ break;
+ }
+ return;
+}
+#endif // WITH_WSREP
/*************************************************************//**
InnoDB uses this function to compare two data fields for which the data type
is such that we must use MySQL code to compare them. NOTE that the prototype
@@ -5897,6 +6106,264 @@ innobase_read_from_2_little_endian(
/*******************************************************************//**
Stores a key value for a row to a buffer.
@return key value length as stored in buff */
+#ifdef WITH_WSREP
+UNIV_INTERN
+uint
+wsrep_store_key_val_for_row(
+/*===============================*/
+ TABLE* table,
+ uint keynr, /*!< in: key number */
+ char* buff, /*!< in/out: buffer for the key value (in MySQL
+ format) */
+ uint buff_len,/*!< in: buffer length */
+ const uchar* record,
+ ibool* key_is_null)/*!< out: full key was null */
+{
+ KEY* key_info = table->key_info + keynr;
+ KEY_PART_INFO* key_part = key_info->key_part;
+ KEY_PART_INFO* end = key_part + key_info->user_defined_key_parts;
+ char* buff_start = buff;
+ enum_field_types mysql_type;
+ Field* field;
+
+ DBUG_ENTER("store_key_val_for_row");
+
+ bzero(buff, buff_len);
+ *key_is_null = TRUE;
+
+ for (; key_part != end; key_part++) {
+ uchar sorted[REC_VERSION_56_MAX_INDEX_COL_LEN] = {'\0'};
+ ibool part_is_null = FALSE;
+
+ if (key_part->null_bit) {
+ if (record[key_part->null_offset] &
+ key_part->null_bit) {
+ *buff = 1;
+ part_is_null = TRUE;
+ } else {
+ *buff = 0;
+ }
+ buff++;
+ }
+ if (!part_is_null) *key_is_null = FALSE;
+
+ field = key_part->field;
+ mysql_type = field->type();
+
+ if (mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* >= 5.0.3 true VARCHAR */
+ ulint lenlen;
+ ulint len;
+ const byte* data;
+ ulint key_len;
+ ulint true_len;
+ CHARSET_INFO* cs;
+ int error=0;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+ cs = field->charset();
+
+ lenlen = (ulint)
+ (((Field_varstring*)field)->length_bytes);
+
+ data = row_mysql_read_true_varchar(&len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ lenlen);
+
+ true_len = len;
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) data,
+ (const char *) data + len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* In a column prefix index, we may need to truncate
+ the stored value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, data, true_len);
+ wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len);
+
+ if (wsrep_protocol_version > 1) {
+ memcpy(buff, sorted, true_len);
+ /* Note that we always reserve the maximum possible
+ length of the true VARCHAR in the key value, though
+ only len first bytes after the 2 length bytes contain
+ actual data. The rest of the space was reset to zero
+ in the bzero() call above. */
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else if (mysql_type == MYSQL_TYPE_TINY_BLOB
+ || mysql_type == MYSQL_TYPE_MEDIUM_BLOB
+ || mysql_type == MYSQL_TYPE_BLOB
+ || mysql_type == MYSQL_TYPE_LONG_BLOB
+ /* MYSQL_TYPE_GEOMETRY data is treated
+ as BLOB data in innodb. */
+ || mysql_type == MYSQL_TYPE_GEOMETRY) {
+
+ CHARSET_INFO* cs;
+ ulint key_len;
+ ulint true_len;
+ int error=0;
+ ulint blob_len;
+ const byte* blob_data;
+
+ ut_a(key_part->key_part_flag & HA_PART_KEY_SEG);
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+
+ cs = field->charset();
+
+ blob_data = row_mysql_read_blob_ref(&blob_len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ (ulint) field->pack_length());
+
+ true_len = blob_len;
+
+ ut_a(get_field_offset(table, field)
+ == key_part->offset);
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (blob_len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) blob_data,
+ (const char *) blob_data
+ + blob_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* All indexes on BLOB and TEXT are column prefix
+ indexes, and we may need to truncate the data to be
+ stored in the key value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, blob_data, true_len);
+ wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len);
+
+ memcpy(buff, sorted, true_len);
+
+ /* Note that we always reserve the maximum possible
+ length of the BLOB prefix in the key value. */
+ if (wsrep_protocol_version > 1) {
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else {
+ /* Here we handle all other data types except the
+ true VARCHAR, BLOB and TEXT. Note that the column
+ value we store may be also in a column prefix
+ index. */
+
+ CHARSET_INFO* cs;
+ ulint true_len;
+ ulint key_len;
+ const uchar* src_start;
+ int error=0;
+ enum_field_types real_type;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len;
+
+ continue;
+ }
+
+ src_start = record + key_part->offset;
+ real_type = field->real_type();
+ true_len = key_len;
+
+ /* Character set for the field is defined only
+ to fields whose type is string and real field
+ type is not enum or set. For these fields check
+ if character set is multi byte. */
+
+ if (real_type != MYSQL_TYPE_ENUM
+ && real_type != MYSQL_TYPE_SET
+ && ( mysql_type == MYSQL_TYPE_VAR_STRING
+ || mysql_type == MYSQL_TYPE_STRING)) {
+
+ cs = field->charset();
+
+ /* For multi byte character sets we need to
+ calculate the true length of the key */
+
+ if (key_len > 0 && cs->mbmaxlen > 1) {
+
+ true_len = (ulint)
+ cs->cset->well_formed_len(cs,
+ (const char *)src_start,
+ (const char *)src_start
+ + key_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+ memcpy(sorted, src_start, true_len);
+ wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len);
+ memcpy(buff, sorted, true_len);
+ } else {
+ memcpy(buff, src_start, true_len);
+ }
+ buff += true_len;
+
+ /* Pad the unused space with spaces. */
+
+#ifdef REMOVED
+ if (true_len < key_len) {
+ ulint pad_len = key_len - true_len;
+ ut_a(!(pad_len % cs->mbminlen));
+
+ cs->cset->fill(cs, buff, pad_len,
+ 0x20 /* space */);
+ buff += pad_len;
+ }
+#endif /* REMOVED */
+ }
+ }
+
+ ut_a(buff <= buff_start + buff_len);
+
+ DBUG_RETURN((uint)(buff - buff_start));
+}
+#endif /* WITH_WSREP */
UNIV_INTERN
uint
ha_innobase::store_key_val_for_row(
@@ -6739,6 +7206,9 @@ ha_innobase::write_row(
dberr_t error;
int error_result= 0;
ibool auto_inc_used= FALSE;
+#ifdef WITH_WSREP
+ ibool auto_inc_inserted= FALSE; /* if NULL was inserted */
+#endif
ulint sql_command;
trx_t* trx = thd_to_trx(user_thd);
@@ -6772,8 +7242,18 @@ ha_innobase::write_row(
if ((sql_command == SQLCOM_ALTER_TABLE
|| sql_command == SQLCOM_OPTIMIZE
|| sql_command == SQLCOM_CREATE_INDEX
+#ifdef WITH_WSREP
+ || (wsrep_on(user_thd) && wsrep_load_data_splitting &&
+ sql_command == SQLCOM_LOAD)
+#endif /* WITH_WSREP */
|| sql_command == SQLCOM_DROP_INDEX)
&& num_write_row >= 10000) {
+#ifdef WITH_WSREP
+ if (wsrep_on(user_thd) && sql_command == SQLCOM_LOAD) {
+ WSREP_DEBUG("forced trx split for LOAD: %s",
+ wsrep_thd_query(user_thd));
+ }
+#endif /* WITH_WSREP */
/* ALTER TABLE is COMMITted at every 10000 copied rows.
The IX table lock for the original table has to be re-issued.
As this method will be called on a temporary table where the
@@ -6807,6 +7287,20 @@ no_commit:
*/
;
} else if (src_table == prebuilt->table) {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_ROLLBACK:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Source table is not in InnoDB format:
no need to re-acquire locks on it. */
@@ -6817,6 +7311,20 @@ no_commit:
/* We will need an IX lock on the destination table. */
prebuilt->sql_stat_start = TRUE;
} else {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_ROLLBACK:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Ensure that there are no other table locks than
LOCK_IX and LOCK_AUTO_INC on the destination table. */
@@ -6845,8 +7353,11 @@ no_commit:
/* Reset the error code before calling
innobase_get_auto_increment(). */
prebuilt->autoinc_error = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ auto_inc_inserted= (table->next_number_field->val_int() == 0);
+#endif
if ((error_result = update_auto_increment())) {
+
/* We don't want to mask autoinc overflow errors. */
/* Handle the case where the AUTOINC sub-system
@@ -6924,6 +7435,30 @@ no_commit:
case SQLCOM_REPLACE_SELECT:
goto set_max_autoinc;
+#ifdef WITH_WSREP
+ /* workaround for LP bug #355000, retrying the insert */
+ case SQLCOM_INSERT:
+ if (wsrep_on(current_thd) &&
+ auto_inc_inserted &&
+ wsrep_drupal_282555_workaround &&
+ !thd_test_options(current_thd,
+ OPTION_NOT_AUTOCOMMIT |
+ OPTION_BEGIN)) {
+ WSREP_DEBUG(
+ "retrying insert: %s",
+ (*wsrep_thd_query(current_thd)) ?
+ wsrep_thd_query(current_thd) :
+ (char *)"void");
+ error= DB_SUCCESS;
+ wsrep_thd_set_conflict_state(
+ current_thd, MUST_ABORT);
+ innobase_srv_conc_exit_innodb(prebuilt->trx);
+ /* jump straight to func exit over
+ * later wsrep hooks */
+ goto func_exit;
+ }
+ break;
+#endif
default:
break;
}
@@ -6982,6 +7517,20 @@ report_error:
error_result = convert_error_code_to_mysql(error,
prebuilt->table->flags,
user_thd);
+#ifdef WITH_WSREP
+ if (!error_result && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd) && !wsrep_consistency_check(user_thd) &&
+ (sql_command != SQLCOM_LOAD ||
+ thd_binlog_format(user_thd) == BINLOG_FORMAT_ROW)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ error_result = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
if (error_result == HA_FTS_INVALID_DOCID) {
my_error(HA_FTS_INVALID_DOCID, MYF(0));
@@ -7270,6 +7819,87 @@ calc_row_difference(
return(DB_SUCCESS);
}
+#ifdef WITH_WSREP
+static
+int
+wsrep_calc_row_hash(
+/*================*/
+ byte* digest, /*!< in/out: md5 sum */
+ const uchar* row, /*!< in: row in MySQL format */
+ TABLE* table, /*!< in: table in MySQL data
+ dictionary */
+ row_prebuilt_t* prebuilt, /*!< in: InnoDB prebuilt struct */
+ THD* thd) /*!< in: user thread */
+{
+ Field* field;
+ enum_field_types field_mysql_type;
+ uint n_fields;
+ ulint len;
+ const byte* ptr;
+ ulint col_type;
+ uint i;
+
+ void *ctx = wsrep_md5_init();
+
+ n_fields = table->s->fields;
+
+ for (i = 0; i < n_fields; i++) {
+ byte null_byte=0;
+ byte true_byte=1;
+
+ field = table->field[i];
+
+ ptr = (const byte*) row + get_field_offset(table, field);
+ len = field->pack_length();
+
+ field_mysql_type = field->type();
+
+ col_type = prebuilt->table->cols[i].mtype;
+
+ switch (col_type) {
+
+ case DATA_BLOB:
+ ptr = row_mysql_read_blob_ref(&len, ptr, len);
+
+ break;
+
+ case DATA_VARCHAR:
+ case DATA_BINARY:
+ case DATA_VARMYSQL:
+ if (field_mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* This is a >= 5.0.3 type true VARCHAR where
+ the real payload data length is stored in
+ 1 or 2 bytes */
+
+ ptr = row_mysql_read_true_varchar(
+ &len, ptr,
+ (ulint)
+ (((Field_varstring*)field)->length_bytes));
+
+ }
+
+ break;
+ default:
+ ;
+ }
+ /*
+ if (field->null_ptr &&
+ field_in_record_is_null(table, field, (char*) row)) {
+ */
+
+ if (field->is_null_in_record(row)) {
+ wsrep_md5_update(ctx, (char*)&null_byte, 1);
+ } else {
+ wsrep_md5_update(ctx, (char*)&true_byte, 1);
+ wsrep_md5_update(ctx, (char*)ptr, len);
+ }
+ }
+
+ wsrep_compute_md5_hash((char*)digest, ctx);
+
+ return(0);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Updates a row given as a parameter to a new value. Note that we are given
whole rows, not just the fields which are updated: this incurs some
@@ -7407,6 +8037,23 @@ func_exit:
innobase_active_small();
+#ifdef WITH_WSREP
+ if (error == DB_SUCCESS &&
+ wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ DBUG_PRINT("wsrep", ("update row key"));
+
+ if (wsrep_append_keys(user_thd, false, old_row, new_row)) {
+ WSREP_DEBUG("WSREP: UPDATE_ROW_KEY FAILED");
+ DBUG_PRINT("wsrep", ("row key failed"));
+ err = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
+
DBUG_RETURN(err);
}
@@ -7454,6 +8101,19 @@ ha_innobase::delete_row(
innobase_active_small();
+#ifdef WITH_WSREP
+ if (error == DB_SUCCESS &&
+ wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("delete fail"));
+ error = (dberr_t)HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
DBUG_RETURN(convert_error_code_to_mysql(
error, prebuilt->table->flags, user_thd));
}
@@ -8600,7 +9260,381 @@ ha_innobase::ft_end()
rnd_end();
}
+#ifdef WITH_WSREP
+extern dict_index_t*
+wsrep_dict_foreign_find_index(
+ dict_table_t* table,
+ const char** columns,
+ ulint n_cols,
+ dict_index_t* types_idx,
+ ibool check_charsets,
+ ulint check_null);
+
+extern dberr_t
+wsrep_append_foreign_key(
+/*===========================*/
+ trx_t* trx, /*!< in: trx */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint */
+ const rec_t* rec, /*!<in: clustered index record */
+ dict_index_t* index, /*!<in: clustered index */
+ ibool referenced, /*!<in: is check for referenced table */
+ ibool shared) /*!<in: is shared access */
+{
+ ut_a(trx);
+ THD* thd = (THD*)trx->mysql_thd;
+ ulint rcode = DB_SUCCESS;
+ char cache_key[513] = {'\0'};
+ int cache_key_len;
+ bool const copy = true;
+
+ if (!wsrep_on(trx->mysql_thd) ||
+ wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ return DB_SUCCESS;
+
+ if (!thd || !foreign ||
+ (!foreign->referenced_table && !foreign->foreign_table))
+ {
+ WSREP_INFO("FK: %s missing in: %s",
+ (!thd) ? "thread" :
+ ((!foreign) ? "constraint" :
+ ((!foreign->referenced_table) ?
+ "referenced table" : "foreign table")),
+ (thd && wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_DEBUG("pulling %s table into cache",
+ (referenced) ? "referenced" : "foreign");
+ mutex_enter(&(dict_sys->mutex));
+ if (referenced)
+ {
+ foreign->referenced_table =
+ dict_table_get_low(
+ foreign->referenced_table_name_lookup);
+ if (foreign->referenced_table)
+ {
+ foreign->referenced_index =
+ wsrep_dict_foreign_find_index(
+ foreign->referenced_table,
+ foreign->referenced_col_names,
+ foreign->n_fields,
+ foreign->foreign_index,
+ TRUE, FALSE);
+ }
+ }
+ else
+ {
+ foreign->foreign_table =
+ dict_table_get_low(
+ foreign->foreign_table_name_lookup);
+ if (foreign->foreign_table)
+ {
+ foreign->foreign_index =
+ wsrep_dict_foreign_find_index(
+ foreign->foreign_table,
+ foreign->foreign_col_names,
+ foreign->n_fields,
+ foreign->referenced_index,
+ TRUE, FALSE);
+ }
+ }
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_WARN("FK: %s missing in query: %s",
+ (!foreign->referenced_table) ?
+ "referenced table" : "foreign table",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1];
+ ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH;
+
+ dict_index_t *idx_target = (referenced) ?
+ foreign->referenced_index : index;
+ dict_index_t *idx = (referenced) ?
+ UT_LIST_GET_FIRST(foreign->referenced_table->indexes) :
+ UT_LIST_GET_FIRST(foreign->foreign_table->indexes);
+ int i = 0;
+ while (idx != NULL && idx != idx_target) {
+ if (innobase_strcasecmp (idx->name, innobase_index_reserve_name) != 0) {
+ i++;
+ }
+ idx = UT_LIST_GET_NEXT(indexes, idx);
+ }
+ ut_a(idx);
+ key[0] = (char)i;
+
+ rcode = wsrep_rec_get_foreign_key(
+ &key[1], &len, rec, index, idx,
+ wsrep_protocol_version > 1);
+ if (rcode != DB_SUCCESS) {
+ WSREP_ERROR(
+ "FK key set failed: %lu (%lu %lu), index: %s %s, %s",
+ rcode, referenced, shared,
+ (index && index->name) ? index->name :
+ "void index",
+ (index && index->table_name) ? index->table_name :
+ "void table",
+ wsrep_thd_query(thd));
+ return DB_ERROR;
+ }
+ strncpy(cache_key,
+ (wsrep_protocol_version > 1) ?
+ ((referenced) ?
+ foreign->referenced_table->name :
+ foreign->foreign_table->name) :
+ foreign->foreign_table->name, sizeof(cache_key) - 1);
+ cache_key_len = strlen(cache_key);
+#ifdef WSREP_DEBUG_PRINT
+ ulint j;
+ fprintf(stderr, "FK parent key, table: %s %s len: %lu ",
+ cache_key, (shared) ? "shared" : "exclusive", len+1);
+ for (j=0; j<len+1; j++) {
+ fprintf(stderr, " %hhX, ", key[j]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ char *p = strchr(cache_key, '/');
+ if (p) {
+ *p = '\0';
+ } else {
+ WSREP_WARN("unexpected foreign key table %s %s",
+ foreign->referenced_table->name,
+ foreign->foreign_table->name);
+ }
+
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)cache_key,
+ cache_key_len + 1,
+ (const uchar*)key, len+1,
+ wkey_part,
+ (size_t*)&wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for cascaded FK: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %lu", rcode));
+ WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ return DB_ERROR;
+ }
+
+ return DB_SUCCESS;
+}
+
+static int
+wsrep_append_key(
+/*==================*/
+ THD *thd,
+ trx_t *trx,
+ TABLE_SHARE *table_share,
+ TABLE *table,
+ const char* key,
+ uint16_t key_len,
+ bool shared
+)
+{
+ DBUG_ENTER("wsrep_append_key");
+ bool const copy = true;
+#ifdef WSREP_DEBUG_PRINT
+ fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ",
+ (shared) ? "Shared" : "Exclusive",
+ wsrep_thd_thread_id(thd), trx->id, key_len,
+ table_share->table_name.str);
+ for (int i=0; i<key_len; i++) {
+ fprintf(stderr, "%hhX, ", key[i]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)table_share->table_cache_key.str,
+ table_share->table_cache_key.length,
+ (const uchar*)key, key_len,
+ wkey_part,
+ (size_t*)&wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+ }
+
+ int rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
+ WSREP_WARN("Appending row key failed: %s, %d",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+ }
+ DBUG_RETURN(0);
+}
+
+extern void compute_md5_hash(char *digest, const char *buf, int len);
+#define MD5_HASH compute_md5_hash
+
+int
+ha_innobase::wsrep_append_keys(
+/*==================*/
+ THD *thd,
+ bool shared,
+ const uchar* record0, /* in: row in MySQL format */
+ const uchar* record1) /* in: row in MySQL format */
+{
+ int rcode;
+ DBUG_ENTER("wsrep_append_keys");
+
+ bool key_appended = false;
+ trx_t *trx = thd_to_trx(thd);
+
+ if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
+ WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
+ wsrep_thd_thread_id(thd),
+ table_share->tmp_table,
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(0);
+ }
+
+ if (wsrep_protocol_version == 0) {
+ uint len;
+ char keyval[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char *key = &keyval[0];
+ KEY *key_info = table->key_info;
+ ibool is_null;
+
+ len = wsrep_store_key_val_for_row(
+ table, 0, key, key_info->key_length, record0, &is_null);
+
+ if (!is_null) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share, table, keyval,
+ len, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped (proto 0): %s",
+ wsrep_thd_query(thd));
+ }
+ } else {
+ ut_a(table->s->keys <= 256);
+ uint i;
+ for (i=0; i<table->s->keys; ++i) {
+ uint len;
+ char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char* key0 = &keyval0[1];
+ char* key1 = &keyval1[1];
+ KEY* key_info = table->key_info + i;
+ ibool is_null;
+
+ dict_index_t* idx = innobase_get_index(i);
+ dict_table_t* tab = (idx) ? idx->table : NULL;
+
+ keyval0[0] = (char)i;
+ keyval1[0] = (char)i;
+
+ if (!tab) {
+ WSREP_WARN("MySQL-InnoDB key mismatch %s %s",
+ table->s->table_name.str,
+ key_info->name);
+ }
+ if (key_info->flags & HA_NOSAME ||
+ ((tab &&
+ dict_table_get_referenced_constraint(tab, idx)) ||
+ (!tab && referenced_by_foreign_key()))) {
+
+ if (key_info->flags & HA_NOSAME || shared)
+ key_appended = true;
+
+ len = wsrep_store_key_val_for_row(
+ table, i, key0, key_info->key_length,
+ record0, &is_null);
+ if (!is_null) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share, table,
+ keyval0, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped: %s",
+ wsrep_thd_query(thd));
+ }
+ if (record1) {
+ len = wsrep_store_key_val_for_row(
+ table, i, key1, key_info->key_length,
+ record1, &is_null);
+ if (!is_null && memcmp(key0, key1, len)) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share,
+ table,
+ keyval1, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ }
+ }
+ }
+ }
+
+ /* if no PK, calculate hash of full row, to be the key value */
+ if (!key_appended && wsrep_certify_nonPK) {
+ uchar digest[16];
+ int rcode;
+
+ wsrep_calc_row_hash(digest, record0, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share, table,
+ (const char*) digest, 16,
+ shared))) {
+ DBUG_RETURN(rcode);
+ }
+
+ if (record1) {
+ wsrep_calc_row_hash(
+ digest, record1, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share,
+ table,
+ (const char*) digest,
+ 16, shared))) {
+ DBUG_RETURN(rcode);
+ }
+ }
+ DBUG_RETURN(0);
+ }
+
+ DBUG_RETURN(0);
+}
+#endif
/*********************************************************************//**
Stores a reference to the current row to 'ref' field of the handle. Note
that in the case where we have generated the clustered index for the
@@ -12307,11 +13341,18 @@ ha_innobase::external_lock(
/* used by test case */
DBUG_EXECUTE_IF("no_innodb_binlog_errors", skip = 1;);
if (!skip) {
+#ifdef WITH_WSREP
+ if (!wsrep_on(thd) || wsrep_thd_exec_mode(thd) == LOCAL_STATE)
+ {
+#endif /* WITH_WSREP */
my_error(ER_BINLOG_STMT_MODE_AND_ROW_ENGINE, MYF(0),
" InnoDB is limited to row-logging when "
"transaction isolation level is "
"READ COMMITTED or READ UNCOMMITTED.");
DBUG_RETURN(HA_ERR_LOGGING_IMPOSSIBLE);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
}
@@ -13735,6 +14776,9 @@ innobase_xa_prepare(
to the session variable take effect only in the next transaction */
if (!trx->support_xa) {
+#ifdef WITH_WSREP
+ thd_get_xid(thd, (MYSQL_XID*) &trx->xid);
+#endif // WITH_WSREP
return(0);
}
@@ -15644,6 +16688,295 @@ static SHOW_VAR innodb_status_variables_export[]= {
static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+#ifdef WITH_WSREP
+void
+wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ (long long)bf_seqno, (long long)victim_seqno);
+ abort();
+}
+int
+wsrep_innobase_kill_one_trx(void * const bf_thd_ptr,
+ const trx_t * const bf_trx,
+ trx_t *victim_trx, ibool signal)
+{
+ ut_ad(lock_mutex_own());
+ ut_ad(trx_mutex_own(victim_trx));
+ ut_ad(bf_thd_ptr);
+ ut_ad(victim_trx);
+
+ DBUG_ENTER("wsrep_innobase_kill_one_trx");
+ THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
+ THD *thd = (THD *) victim_trx->mysql_thd;
+ int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
+
+ if (!thd) {
+ DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
+ WSREP_WARN("no THD for trx: %lu", victim_trx->id);
+ DBUG_RETURN(1);
+ }
+ if (!bf_thd) {
+ DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
+ WSREP_WARN("no BF THD for trx: %lu", (bf_trx) ? bf_trx->id : 0);
+ DBUG_RETURN(1);
+ }
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: %lu",
+ signal, (long long)bf_seqno,
+ wsrep_thd_thread_id(thd),
+ victim_trx->id);
+
+ WSREP_DEBUG("Aborting query: %s",
+ (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void");
+
+ wsrep_thd_LOCK(thd);
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
+ WSREP_DEBUG("kill trx EXITING for %lu", victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ }
+ if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
+ WSREP_DEBUG("withdraw for BF trx: %lu, state: %d",
+ victim_trx->id,
+ wsrep_thd_conflict_state(thd));
+ }
+
+ switch (wsrep_thd_conflict_state(thd)) {
+ case NO_CONFLICT:
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("victim %lu in MUST ABORT state",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ DBUG_RETURN(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("victim %lu in state %d",
+ victim_trx->id, wsrep_thd_conflict_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd)) {
+ case QUERY_COMMITTING:
+ enum wsrep_status rcode;
+
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+
+ WSREP_DEBUG("kill trx QUERY_COMMITTING for %lu",
+ victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ } else {
+ rcode = wsrep->abort_pre_commit(
+ wsrep, bf_seqno,
+ (wsrep_trx_id_t)victim_trx->id
+ );
+
+ switch (rcode) {
+ case WSREP_WARNING:
+ WSREP_DEBUG("cancel commit warning: %lu",
+ victim_trx->id);
+ DBUG_RETURN(1);
+ break;
+ case WSREP_OK:
+ break;
+ default:
+ WSREP_ERROR(
+ "cancel commit bad exit: %d %lu",
+ rcode,
+ victim_trx->id);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first.
+ */
+ abort();
+ break;
+ }
+ }
+ break;
+ case QUERY_EXEC:
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting
+ */
+ WSREP_DEBUG("kill trx QUERY_EXEC for %lu", victim_trx->id);
+
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ if (victim_trx->lock.wait_lock) {
+ WSREP_DEBUG("victim has wait flag: %ld",
+ wsrep_thd_thread_id(thd));
+ lock_t* wait_lock = victim_trx->lock.wait_lock;
+ if (wait_lock) {
+ WSREP_DEBUG("canceling wait lock");
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ lock_cancel_waiting_and_release(wait_lock);
+ }
+
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ } else {
+ /* abort currently executing query */
+ DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ }
+ }
+ break;
+ case QUERY_IDLE:
+ {
+ bool skip_abort= false;
+ wsrep_aborting_thd_t abortees;
+
+ WSREP_DEBUG("kill IDLE for %lu", victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ WSREP_DEBUG("kill BF IDLE, seqno: %lld",
+ (long long)wsrep_thd_trx_seqno(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ DBUG_RETURN(0);
+ }
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+
+ abortees = wsrep_aborting_thd;
+ while (abortees && !skip_abort) {
+ /* check if we have a kill message for this already */
+ if (abortees->aborting_thd == thd) {
+ skip_abort = true;
+ WSREP_WARN("duplicate thd aborter %lu",
+ wsrep_thd_thread_id(thd));
+ }
+ abortees = abortees->next;
+ }
+ if (!skip_abort) {
+ wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
+ my_malloc(sizeof(struct wsrep_aborting_thd),
+ MYF(0));
+ aborting->aborting_thd = thd;
+ aborting->next = wsrep_aborting_thd;
+ wsrep_aborting_thd = aborting;
+ DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("enqueuing trx abort for (%lu)",
+ wsrep_thd_thread_id(thd));
+ }
+
+ DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
+ WSREP_DEBUG("signaling aborter");
+ mysql_cond_signal(&COND_wsrep_rollback);
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+ wsrep_thd_UNLOCK(thd);
+
+ break;
+ }
+ default:
+ WSREP_WARN("bad wsrep query state: %d",
+ wsrep_thd_query_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ break;
+ }
+
+ DBUG_RETURN(0);
+}
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal)
+{
+ DBUG_ENTER("wsrep_innobase_abort_thd");
+ trx_t* victim_trx = thd_to_trx(victim_thd);
+ trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
+ WSREP_DEBUG("abort transaction: BF: %s victim: %s",
+ wsrep_thd_query(bf_thd),
+ wsrep_thd_query(victim_thd));
+
+ if (victim_trx)
+ {
+ lock_mutex_enter();
+ trx_mutex_enter(victim_trx);
+ int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
+ victim_trx, signal);
+ trx_mutex_exit(victim_trx);
+ lock_mutex_exit();
+ wsrep_srv_conc_cancel_wait(victim_trx);
+
+ DBUG_RETURN(rcode);
+ } else {
+ WSREP_DEBUG("victim does not have transaction");
+ wsrep_thd_LOCK(victim_thd);
+ wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
+ wsrep_thd_UNLOCK(victim_thd);
+ wsrep_thd_awake(victim_thd, signal);
+ }
+ DBUG_RETURN(-1);
+}
+
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ if (wsrep_is_wsrep_xid(xid)) {
+ mtr_t mtr;
+ mtr_start(&mtr);
+ trx_sysf_t* sys_header = trx_sysf_get(&mtr);
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ trx_sys_read_wsrep_checkpoint(xid);
+ return 0;
+}
+
+static void
+wsrep_fake_trx_id(
+/*==================*/
+ handlerton *hton,
+ THD *thd) /*!< in: user thread handle */
+{
+ mutex_enter(&trx_sys->mutex);
+ trx_id_t trx_id = trx_sys_get_new_trx_id();
+ mutex_exit(&trx_sys->mutex);
+
+ (void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
+}
+
+#endif /* WITH_WSREP */
/* plugin options */
static MYSQL_SYSVAR_ENUM(checksum_algorithm, srv_checksum_algorithm,
@@ -16339,6 +17672,40 @@ static MYSQL_SYSVAR_BOOL(disable_background_merge,
NULL, NULL, FALSE);
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/*******************************************************
+ * innobase_disallow_writes variable definition *
+ *******************************************************/
+
+/* Must always init to FALSE. */
+static my_bool innobase_disallow_writes = FALSE;
+
+/**************************************************************************
+An "update" method for innobase_disallow_writes variable. */
+static
+void
+innobase_disallow_writes_update(
+/*============================*/
+ THD* thd, /* in: thread handle */
+ st_mysql_sys_var* var, /* in: pointer to system
+ variable */
+ void* var_ptr, /* out: pointer to dynamic
+ variable */
+ const void* save) /* in: temporary storage */
+{
+ *(my_bool*)var_ptr = *(my_bool*)save;
+ ut_a(srv_allow_writes_event);
+ if (*(my_bool*)var_ptr)
+ os_event_reset(srv_allow_writes_event);
+ else
+ os_event_set(srv_allow_writes_event);
+}
+
+static MYSQL_SYSVAR_BOOL(disallow_writes, innobase_disallow_writes,
+ PLUGIN_VAR_NOCMDOPT,
+ "Tell InnoDB to stop any writes to disk",
+ NULL, innobase_disallow_writes_update, FALSE);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
static MYSQL_SYSVAR_BOOL(random_read_ahead, srv_random_read_ahead,
PLUGIN_VAR_NOCMDARG,
"Whether to use read ahead for random access within an extent.",
@@ -16534,6 +17901,9 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(change_buffering_debug),
MYSQL_SYSVAR(disable_background_merge),
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ MYSQL_SYSVAR(disallow_writes),
+#endif /* WITH_INNODB_DISALLOW_WRITES */
MYSQL_SYSVAR(random_read_ahead),
MYSQL_SYSVAR(read_ahead_threshold),
MYSQL_SYSVAR(read_only),
diff --git a/storage/innobase/handler/ha_innodb.h b/storage/innobase/handler/ha_innodb.h
index 1fb071f5765..f4eb2bed1d4 100644
--- a/storage/innobase/handler/ha_innodb.h
+++ b/storage/innobase/handler/ha_innodb.h
@@ -102,6 +102,10 @@ class ha_innobase: public handler
void innobase_initialize_autoinc();
dict_index_t* innobase_get_index(uint keynr);
+#ifdef WITH_WSREP
+ int wsrep_append_keys(THD *thd, bool shared,
+ const uchar* record0, const uchar* record1);
+#endif
/* Init values for the class: */
public:
ha_innobase(handlerton *hton, TABLE_SHARE *table_arg);
@@ -447,7 +451,40 @@ __attribute__((nonnull));
*/
extern void mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file);
-struct trx_t;
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+//extern "C" int wsrep_trx_order_before(void *thd1, void *thd2);
+
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd);
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd);
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd);
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
+extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
+extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
+extern "C" const char * wsrep_thd_query_state_str(THD *thd);
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state);
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state);
+
+extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id);
+
+extern "C"void wsrep_thd_LOCK(THD *thd);
+extern "C"void wsrep_thd_UNLOCK(THD *thd);
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
+extern "C" time_t wsrep_thd_query_start(THD *thd);
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
+extern "C" int64_t wsrep_thd_trx_seqno(THD *thd);
+extern "C" query_id_t wsrep_thd_query_id(THD *thd);
+extern "C" char * wsrep_thd_query(THD *thd);
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
+extern "C" void wsrep_thd_awake(THD* thd, my_bool signal);
+#endif
extern const struct _ft_vft ft_vft_result;
@@ -485,6 +522,9 @@ innobase_index_name_is_reserved(
__attribute__((nonnull, warn_unused_result));
/*****************************************************************//**
+#ifdef WITH_WSREP
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
Determines InnoDB table flags.
@retval true if successful, false if error */
UNIV_INTERN
diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc
index a120534b36d..fe48c23e4b2 100644
--- a/storage/innobase/handler/handler0alter.cc
+++ b/storage/innobase/handler/handler0alter.cc
@@ -47,6 +47,10 @@ Smart ALTER TABLE
#include "fts0priv.h"
#include "pars0pars.h"
+#ifdef WITH_WSREP
+//#include "wsrep_api.h"
+#include <sql_acl.h> // PROCESS_ACL
+#endif
#include "ha_innodb.h"
/** Operations for creating an index in place */
diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h
index 671f67eb1f8..3077162111a 100644
--- a/storage/innobase/include/dict0mem.h
+++ b/storage/innobase/include/dict0mem.h
@@ -463,6 +463,9 @@ be REC_VERSION_56_MAX_INDEX_COL_LEN (3072) bytes */
/** Defines the maximum fixed length column size */
#define DICT_MAX_FIXED_COL_LEN DICT_ANTELOPE_MAX_INDEX_COL_LEN
+#ifdef WITH_WSREP
+#define WSREP_MAX_SUPPORTED_KEY_LENGTH 3500
+#endif /* WITH_WSREP */
/** Data structure for a field in an index */
struct dict_field_t{
diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h
index a16ce656f04..08ac2a74405 100644
--- a/storage/innobase/include/ha_prototypes.h
+++ b/storage/innobase/include/ha_prototypes.h
@@ -286,6 +286,19 @@ innobase_casedn_str(
/*================*/
char* a); /*!< in/out: string to put in lower case */
+#ifdef WITH_WSREP
+UNIV_INTERN
+int
+wsrep_innobase_kill_one_trx(void *thd_ptr,
+ const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
+extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
+int wsrep_trx_order_before(void *thd1, void *thd2);
+void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
+ unsigned char* str, unsigned int str_length);
+int
+wsrep_on(void *thd_ptr);
+extern "C" int wsrep_is_wsrep_xid(const void*);
+#endif /* WITH_WSREP */
/**********************************************************************//**
Determines the connection character set.
@return connection character set */
diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h
index 8e6fdaed3d5..24e612e63f5 100644
--- a/storage/innobase/include/lock0lock.h
+++ b/storage/innobase/include/lock0lock.h
@@ -873,9 +873,7 @@ lock_trx_has_sys_table_locks(
record */
#define LOCK_CONV_BY_OTHER 4096 /*!< this bit is set when the lock is created
by other transaction */
-#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_MODE_MASK
-# error
-#endif
+#define WSREP_BF 8192
#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_TYPE_MASK
# error
#endif
@@ -958,6 +956,16 @@ extern lock_sys_t* lock_sys;
mutex_exit(&lock_sys->wait_mutex); \
} while (0)
+#ifdef WITH_WSREP
+/*********************************************************************//**
+Cancels a waiting lock request and releases possible other transactions
+waiting behind it. */
+UNIV_INTERN
+void
+lock_cancel_waiting_and_release(
+/*============================*/
+ lock_t* lock); /*!< in/out: waiting lock request */
+#endif /* WITH_WSREP */
#ifndef UNIV_NONINL
#include "lock0lock.ic"
#endif
diff --git a/storage/innobase/include/rem0rec.h b/storage/innobase/include/rem0rec.h
index 2a84aee7a6f..fc60965e089 100644
--- a/storage/innobase/include/rem0rec.h
+++ b/storage/innobase/include/rem0rec.h
@@ -970,6 +970,15 @@ are given in one byte (resp. two byte) format. */
two upmost bits in a two byte offset for special purposes */
#define REC_MAX_DATA_SIZE (16 * 1024)
+#ifdef WITH_WSREP
+int wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index for foreign table */
+ dict_index_t* index_ref, /* in: index for referenced table */
+ ibool new_protocol); /* in: protocol > 1 */
+#endif /* WITH_WSREP */
#ifndef UNIV_NONINL
#include "rem0rec.ic"
#endif
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index 1e98cf690d8..3101a463158 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -257,6 +257,10 @@ extern ulong srv_flush_log_at_trx_commit;
extern uint srv_flush_log_at_timeout;
extern char srv_adaptive_flushing;
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/* When this event is reset we do not allow any file writes to take place. */
+extern os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
/* If this flag is TRUE, then we will load the indexes' (and tables') metadata
even if they are marked as "corrupted". Mostly it is for DBA to process
corrupted index and table */
@@ -889,5 +893,13 @@ struct srv_slot_t{
# define srv_start_raw_disk_in_use 0
# define srv_file_per_table 1
#endif /* !UNIV_HOTBACKUP */
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx); /*!< in: transaction object associated with the
+ thread */
+#endif /* WITH_WSREP */
#endif
diff --git a/storage/innobase/include/sync0sync.ic b/storage/innobase/include/sync0sync.ic
index ad77ad6d5a4..cc087559fb7 100644
--- a/storage/innobase/include/sync0sync.ic
+++ b/storage/innobase/include/sync0sync.ic
@@ -204,7 +204,10 @@ mutex_enter_func(
ulint line) /*!< in: line where locked */
{
ut_ad(mutex_validate(mutex));
+#ifndef WITH_WSREP
+ /* this cannot be be granted when BF trx kills a trx in lock wait state */
ut_ad(!mutex_own(mutex));
+#endif /* WITH_WSREP */
/* Note that we do not peek at the value of lock_word before trying
the atomic test_and_set; we could peek, and possibly save time. */
diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h
index 70f214d1ac7..9ffc8d99a7f 100644
--- a/storage/innobase/include/trx0sys.h
+++ b/storage/innobase/include/trx0sys.h
@@ -42,6 +42,9 @@ Created 3/26/1996 Heikki Tuuri
#include "read0types.h"
#include "page0types.h"
#include "ut0bh.h"
+#ifdef WITH_WSREP
+#include "trx0xa.h"
+#endif /* WITH_WSREP */
typedef UT_LIST_BASE_NODE_T(trx_t) trx_list_t;
@@ -293,6 +296,9 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr); /*!< in: mtr */
/*****************************************************************//**
Prints to stderr the MySQL binlog offset info in the trx system header if
@@ -301,6 +307,19 @@ UNIV_INTERN
void
trx_sys_print_mysql_binlog_offset(void);
/*===================================*/
+#ifdef WITH_WSREP
+/** Update WSREP checkpoint XID in sys header. */
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: WSREP XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr); /*!< in: mtr */
+
+void
+/** Read WSREP checkpoint XID from sys header. */
+trx_sys_read_wsrep_checkpoint(
+ XID* xid); /*!< out: WSREP XID */
+#endif /* WITH_WSREP */
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header if
the magic number shows it valid. */
@@ -529,6 +548,20 @@ this contains the same fields as TRX_SYS_MYSQL_LOG_INFO below */
within that file */
#define TRX_SYS_MYSQL_LOG_NAME 12 /*!< MySQL log file name */
+#ifdef WITH_WSREP
+/* The offset to WSREP XID headers */
+#define TRX_SYS_WSREP_XID_INFO (UNIV_PAGE_SIZE - 3500)
+#define TRX_SYS_WSREP_XID_MAGIC_N_FLD 0
+#define TRX_SYS_WSREP_XID_MAGIC_N 0x77737265
+
+/* XID field: formatID, gtrid_len, bqual_len, xid_data */
+#define TRX_SYS_WSREP_XID_LEN (4 + 4 + 4 + XIDDATASIZE)
+#define TRX_SYS_WSREP_XID_FORMAT 4
+#define TRX_SYS_WSREP_XID_GTRID_LEN 8
+#define TRX_SYS_WSREP_XID_BQUAL_LEN 12
+#define TRX_SYS_WSREP_XID_DATA 16
+#endif /* WITH_WSREP*/
+
/** Doublewrite buffer */
/* @{ */
/** The offset of the doublewrite buffer header on the trx system header page */
diff --git a/storage/innobase/include/trx0sys.ic b/storage/innobase/include/trx0sys.ic
index e097e29b551..7265a97ae25 100644
--- a/storage/innobase/include/trx0sys.ic
+++ b/storage/innobase/include/trx0sys.ic
@@ -445,7 +445,10 @@ trx_id_t
trx_sys_get_new_trx_id(void)
/*========================*/
{
+#ifndef WITH_WSREP
+ /* wsrep_fake_trx_id violates this assert */
ut_ad(mutex_own(&trx_sys->mutex));
+#endif /* WITH_WSREP */
/* VERY important: after the database is started, max_trx_id value is
divisible by TRX_SYS_TRX_ID_WRITE_MARGIN, and the following if
diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h
index bb84c1806f2..5c807c03535 100644
--- a/storage/innobase/include/trx0trx.h
+++ b/storage/innobase/include/trx0trx.h
@@ -990,6 +990,9 @@ struct trx_t{
/*------------------------------*/
char detailed_error[256]; /*!< detailed error message for last
error, or empty. */
+#ifdef WITH_WSREP
+ os_event_t wsrep_event; /* event waited for in srv_conc_slot */
+#endif /* WITH_WSREP */
};
/* Transaction isolation levels (trx->isolation_level) */
diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc
index 1152152cc77..e34710fd76c 100644
--- a/storage/innobase/lock/lock0lock.cc
+++ b/storage/innobase/lock/lock0lock.cc
@@ -49,6 +49,11 @@ Created 5/7/1996 Heikki Tuuri
#include "btr0btr.h"
#include "dict0boot.h"
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+extern my_bool wsrep_log_conflicts;
+#include "ha_prototypes.h"
+#endif
/* Restricts the length of search we will do in the waits-for
graph of transactions */
#define LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK 1000000
@@ -970,6 +975,11 @@ lock_rec_has_to_wait(
&& !lock_mode_compatible(static_cast<enum lock_mode>(
LOCK_MODE_MASK & type_mode),
lock_get_mode(lock2))) {
+#ifdef WITH_WSREP
+ if ((type_mode & WSREP_BF) && (lock2->type_mode & WSREP_BF)) {
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
/* We have somewhat complex rules when gap type record locks
cause waits */
@@ -1518,6 +1528,11 @@ lock_rec_has_expl(
return(NULL);
}
+#ifdef WITH_WSREP
+static
+void
+lock_rec_discard(lock_t* in_lock);
+#endif
#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
@@ -1566,6 +1581,58 @@ lock_rec_other_has_expl_req(
}
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+static void
+wsrep_kill_victim(const trx_t * const trx, const lock_t *lock) {
+ ut_ad(lock_mutex_own());
+ ut_ad(trx_mutex_own(lock->trx));
+ int bf_this = wsrep_thd_is_brute_force(trx->mysql_thd);
+ int bf_other =
+ wsrep_thd_is_brute_force(lock->trx->mysql_thd);
+ if ((bf_this && !bf_other) ||
+ (bf_this && bf_other && wsrep_trx_order_before(
+ trx->mysql_thd, lock->trx->mysql_thd))) {
+
+ if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: BF victim waiting\n");
+ /* cannot release lock, until our lock
+ is in the queue*/
+ } else if (lock->trx != trx) {
+ if (wsrep_log_conflicts) {
+ mutex_enter(&trx_sys->mutex);
+ if (bf_this)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print_latched(stderr, trx, 3000);
+
+ if (bf_other)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print_latched(stderr, lock->trx, 3000);
+
+ mutex_exit(&trx_sys->mutex);
+ fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
+ stderr);
+
+ if (lock_get_type(lock) == LOCK_REC) {
+ lock_rec_print(stderr, lock);
+ } else {
+ lock_table_print(stderr, lock);
+ }
+ }
+ wsrep_innobase_kill_one_trx(trx->mysql_thd,
+ (const trx_t*) trx, lock->trx, TRUE);
+ }
+ }
+}
+#endif
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
@@ -1595,6 +1662,11 @@ lock_rec_other_has_conflicting(
lock = lock_rec_get_next_const(heap_no, lock)) {
if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) {
+#ifdef WITH_WSREP
+ trx_mutex_enter(lock->trx);
+ wsrep_kill_victim((trx_t*)trx, (ib_lock_t*)lock);
+ trx_mutex_exit(lock->trx);
+#endif
return(lock);
}
}
@@ -1732,6 +1804,10 @@ static
lock_t*
lock_rec_create(
/*============*/
+#ifdef WITH_WSREP
+ lock_t* const c_lock, /* conflicting lock */
+ que_thr_t* thr,
+#endif
ulint type_mode,/*!< in: lock mode and wait
flag, type is ignored and
replaced by LOCK_REC */
@@ -1785,6 +1861,11 @@ lock_rec_create(
lock->trx = trx;
lock->type_mode = (type_mode & ~LOCK_TYPE_MASK) | LOCK_REC;
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_brute_force(trx->mysql_thd)) {
+ lock->type_mode |= WSREP_BF;
+ }
+#endif /* WITH_WSREP */
lock->index = index;
lock->un_member.rec_lock.space = space;
@@ -1803,8 +1884,81 @@ lock_rec_create(
ut_ad(index->table->n_ref_count > 0 || !index->table->can_be_evicted);
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_brute_force(trx->mysql_thd)) {
+ lock_t *hash = (lock_t*)c_lock->hash;
+ lock_t *prev = NULL;
+
+ while (hash &&
+ wsrep_thd_is_brute_force(((lock_t*)hash)->trx->mysql_thd) &&
+ wsrep_trx_order_before(((lock_t*)hash)->trx->mysql_thd, trx->mysql_thd)){
+ prev = hash;
+ hash = (lock_t*)hash->hash;
+ }
+ lock->hash = hash;
+ if (prev) {
+ prev->hash = lock;
+ } else {
+ c_lock->hash = lock;
+ }
+ /*
+ * delayed conflict resolution '...kill_one_trx' was not called,
+ * if victim was waiting for some other lock
+ */
+ trx_mutex_enter(c_lock->trx);
+ if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+
+ c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
+
+ if (wsrep_debug &&
+ c_lock->trx->lock.wait_lock != c_lock) {
+ fprintf(stderr, "WSREP: c_lock != wait lock\n");
+ lock_rec_print(stderr, c_lock);
+ lock_rec_print(stderr, c_lock->trx->lock.wait_lock);
+ }
+
+ trx->lock.que_state = TRX_QUE_LOCK_WAIT;
+ lock_set_lock_and_trx_wait(lock, trx);
+ UT_LIST_ADD_LAST(trx_locks, trx->lock.trx_locks, lock);
+
+ ut_ad(thr != NULL);
+ trx->lock.wait_thr = thr;
+ thr->state = QUE_THR_LOCK_WAIT;
+
+ /* have to release trx mutex for the duration of
+ victim lock release. This will eventually call
+ lock_grant, which wants to grant trx mutex again
+ */
+ if (caller_owns_trx_mutex) trx_mutex_exit(trx);
+ lock_cancel_waiting_and_release(
+ c_lock->trx->lock.wait_lock);
+ if (caller_owns_trx_mutex) trx_mutex_enter(trx);
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->lock.wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+ trx_mutex_exit(c_lock->trx);
+
+ if (wsrep_debug) fprintf(
+ stderr,
+ "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+
+ /* have to bail out here to avoid lock_set_lock... */
+ return(lock);
+ }
+ trx_mutex_exit(c_lock->trx);
+ } else {
+ HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
+ lock_rec_fold(space, page_no), lock);
+ }
+#else
HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock);
+#endif /* WITH_WSREP */
if (!caller_owns_trx_mutex) {
trx_mutex_enter(trx);
@@ -1839,6 +1993,9 @@ static
dberr_t
lock_rec_enqueue_waiting(
/*=====================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint type_mode,/*!< in: lock mode this
transaction is requesting:
LOCK_S or LOCK_X, possibly
@@ -1897,9 +2054,20 @@ lock_rec_enqueue_waiting(
/* Enqueue the lock request that will wait
to be granted, note that we already own
the trx mutex. */
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->lock.was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ lock = lock_rec_create(
+ c_lock, thr,
+ type_mode | LOCK_WAIT, block, heap_no,
+ index, trx, TRUE);
+#else
lock = lock_rec_create(
type_mode | LOCK_WAIT, block, heap_no,
index, trx, TRUE);
+#endif /*WITH_WSREP */
} else {
ut_ad(lock->type_mode & LOCK_WAIT);
ut_ad(lock->type_mode & LOCK_CONV_BY_OTHER);
@@ -2006,7 +2174,19 @@ lock_rec_add_to_queue(
const lock_t* other_lock
= lock_rec_other_has_expl_req(mode, 0, LOCK_WAIT,
block, heap_no, trx);
+#ifdef WITH_WSREP
+ /* this can potentionally assert with wsrep */
+ if (wsrep_on(trx->mysql_thd)) {
+ if (wsrep_debug && other_lock) {
+ fprintf(stderr,
+ "WSREP: InnoDB assert ignored\n");
+ }
+ } else {
+ ut_a(!other_lock);
+ }
+#else
ut_a(!other_lock);
+#endif /* WITH_WSREP */
}
#endif /* UNIV_DEBUG */
@@ -2057,9 +2237,15 @@ lock_rec_add_to_queue(
}
somebody_waits:
- return(lock_rec_create(
+#ifdef WITH_WSREP
+ return(lock_rec_create(NULL, NULL,
type_mode, block, heap_no, index, trx,
caller_owns_trx_mutex));
+#else
+ return(lock_rec_create(
+ type_mode, block, heap_no, index, trx,
+ caller_owns_trx_mutex));
+#endif
}
/** Record locking request status */
@@ -2110,6 +2296,11 @@ lock_rec_lock_fast(
|| (LOCK_MODE_MASK & mode) == LOCK_X);
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0
+#ifdef WITH_WSREP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == 0
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_GAP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_REC_NOT_GAP
+#endif /* WITH_WSREP */
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP);
ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
@@ -2120,8 +2311,13 @@ lock_rec_lock_fast(
if (lock == NULL) {
if (!impl) {
/* Note that we don't own the trx mutex. */
+#ifdef WITH_WSREP
+ lock = lock_rec_create(NULL, thr,
+ mode, block, heap_no, index, trx, FALSE);
+#else
lock = lock_rec_create(
mode, block, heap_no, index, trx, FALSE);
+#endif
}
status = LOCK_REC_SUCCESS_CREATED;
@@ -2175,6 +2371,9 @@ lock_rec_lock_slow(
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
+#ifdef WITH_WSREP
+ lock_t* c_lock(NULL);
+#endif
lock_t* lock;
dberr_t err = DB_SUCCESS;
@@ -2187,6 +2386,11 @@ lock_rec_lock_slow(
|| (LOCK_MODE_MASK & mode) == LOCK_X);
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0
+#ifdef WITH_WSREP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == 0
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_GAP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_REC_NOT_GAP
+#endif /* WITH_WSREP */
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP);
ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
@@ -2217,10 +2421,15 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
-
+#ifdef WITH_WSREP
+ } else if ((c_lock = (ib_lock_t*)lock_rec_other_has_conflicting(
+ static_cast<enum lock_mode>(mode),
+ block, heap_no, trx))) {
+#else
} else if (lock_rec_other_has_conflicting(
static_cast<enum lock_mode>(mode),
block, heap_no, trx)) {
+#endif
/* If another transaction has a non-gap conflicting
request in the queue, as this transaction does not
@@ -2229,8 +2438,17 @@ lock_rec_lock_slow(
ut_ad(lock == NULL);
enqueue_waiting:
+#ifdef WITH_WSREP
+ /* c_lock is NULL here if jump to enqueue_waiting happened
+ but it's ok because lock is not NULL in that case and c_lock
+ is not used. */
+ err = lock_rec_enqueue_waiting(
+ c_lock, mode, block, heap_no,
+ lock, index, thr);
+#else
err = lock_rec_enqueue_waiting(
mode, block, heap_no, lock, index, thr);
+#endif
} else if (!impl) {
/* Set the requested lock on the record, note that
@@ -2281,7 +2499,19 @@ lock_rec_lock(
|| (LOCK_MODE_MASK & mode) == LOCK_X);
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
+#ifdef WITH_WSREP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == 0
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_GAP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_REC_NOT_GAP
+#endif /* WITH_WSREP */
|| mode - (LOCK_MODE_MASK & mode) == 0);
+
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_brute_force(thr_get_trx(thr)->mysql_thd)) {
+ mode |= WSREP_BF;
+ }
+#endif
+
ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
/* We try a simplified and faster subroutine for the most
@@ -3727,10 +3957,19 @@ lock_deadlock_select_victim(
if (trx_weight_ge(ctx->wait_lock->trx, ctx->start)) {
/* The joining transaction is 'smaller',
choose it as the victim and roll it back. */
-
+#ifdef WITH_WSREP
+ if (!wsrep_thd_is_brute_force(ctx->start->mysql_thd)) {
+ return(ctx->start);
+ }
+#else
return(ctx->start);
+#endif
}
-
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_brute_force(ctx->wait_lock->trx->mysql_thd)) {
+ return(ctx->start);
+ }
+#endif
return(ctx->wait_lock->trx);
}
@@ -4067,6 +4306,9 @@ UNIV_INLINE
lock_t*
lock_table_create(
/*==============*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /*!< in: conflicting lock */
+#endif
dict_table_t* table, /*!< in/out: database table
in dictionary cache */
ulint type_mode,/*!< in: lock mode possibly ORed with
@@ -4111,7 +4353,27 @@ lock_table_create(
ut_ad(table->n_ref_count > 0 || !table->can_be_evicted);
UT_LIST_ADD_LAST(trx_locks, trx->lock.trx_locks, lock);
+
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_brute_force(trx->mysql_thd)) {
+ UT_LIST_INSERT_AFTER(
+ un_member.tab_lock.locks, table->locks, c_lock, lock);
+ } else {
+ UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+ }
+
+ // if (c_lock && c_lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
+ if (c_lock && c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: table c_lock in wait: %llu\n",
+ (ulonglong) lock->trx->id);
+ c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
+ lock_cancel_waiting_and_release(c_lock);
+ }
+
+#else
UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+#endif
if (UNIV_UNLIKELY(type_mode & LOCK_WAIT)) {
@@ -4268,6 +4530,9 @@ static
dberr_t
lock_table_enqueue_waiting(
/*=======================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /*!< in: conflicting lock */
+#endif
ulint mode, /*!< in: lock mode this transaction is
requesting */
dict_table_t* table, /*!< in/out: table */
@@ -4311,7 +4576,14 @@ lock_table_enqueue_waiting(
/* Enqueue the lock request that will wait to be granted */
- lock = lock_table_create(table, mode | LOCK_WAIT, trx);
+#ifdef WITH_WSREP
+ if (c_lock->trx->lock.was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ lock = lock_table_create(c_lock, table, mode | LOCK_WAIT, trx);
+#else
+ lock = lock_table_create(table, mode | LOCK_WAIT, trx);
+#endif
/* Release the mutex to obey the latching order.
This is safe, because lock_deadlock_check_and_resolve()
@@ -4383,6 +4655,12 @@ lock_table_other_has_incompatible(
&& !lock_mode_compatible(lock_get_mode(lock), mode)
&& (wait || !lock_get_wait(lock))) {
+#ifdef WITH_WSREP
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: table lock abort");
+ wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
+#endif
+
return(lock);
}
}
@@ -4405,6 +4683,9 @@ lock_table(
enum lock_mode mode, /*!< in: lock mode */
que_thr_t* thr) /*!< in: query thread */
{
+#ifdef WITH_WSREP
+ lock_t *c_lock = NULL;
+#endif
trx_t* trx;
dberr_t err;
const lock_t* wait_for;
@@ -4435,8 +4716,13 @@ lock_table(
/* We have to check if the new lock is compatible with any locks
other transactions have in the table lock queue. */
+#ifdef WITH_WSREP
+ wait_for = lock_table_other_has_incompatible(
+ trx, LOCK_WAIT, table, mode);
+#else
wait_for = lock_table_other_has_incompatible(
trx, LOCK_WAIT, table, mode);
+#endif
trx_mutex_enter(trx);
@@ -4444,9 +4730,17 @@ lock_table(
mode: this trx may have to wait */
if (wait_for != NULL) {
+#ifdef WITH_WSREP
+ err = lock_table_enqueue_waiting((ib_lock_t*)wait_for, mode | flags, table, thr);
+#else
err = lock_table_enqueue_waiting(mode | flags, table, thr);
+#endif
} else {
+#ifdef WITH_WSREP
+ lock_table_create(c_lock, table, mode | flags, trx);
+#else
lock_table_create(table, mode | flags, trx);
+#endif
ut_a(!flags || mode == LOCK_S || mode == LOCK_X);
@@ -5577,6 +5871,7 @@ lock_rec_queue_validate(
if (!lock_rec_get_gap(lock) && !lock_get_wait(lock)) {
+#ifndef WITH_WSREP
enum lock_mode mode;
if (lock_get_mode(lock) == LOCK_S) {
@@ -5586,6 +5881,7 @@ lock_rec_queue_validate(
}
ut_a(!lock_rec_other_has_expl_req(
mode, 0, 0, block, heap_no, lock->trx));
+#endif /* WITH_WSREP */
} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
@@ -5887,6 +6183,9 @@ lock_rec_insert_check_and_lock(
lock_t* lock;
dberr_t err;
ulint next_rec_heap_no;
+#ifdef WITH_WSREP
+ lock_t* c_lock=NULL;
+#endif
ut_ad(block->frame == page_align(rec));
ut_ad(!dict_index_is_online_ddl(index)
@@ -5943,17 +6242,30 @@ lock_rec_insert_check_and_lock(
had to wait for their insert. Both had waiting gap type lock requests
on the successor, which produced an unnecessary deadlock. */
+#ifdef WITH_WSREP
+ if ((c_lock = (ib_lock_t*)lock_rec_other_has_conflicting(
+ static_cast<enum lock_mode>(
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION | WSREP_BF),
+ block, next_rec_heap_no, trx))) {
+#else
if (lock_rec_other_has_conflicting(
static_cast<enum lock_mode>(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION),
block, next_rec_heap_no, trx)) {
+#endif
/* Note that we may get DB_SUCCESS also here! */
trx_mutex_enter(trx);
+#ifdef WITH_WSREP
+ err = lock_rec_enqueue_waiting(c_lock,
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
+ block, next_rec_heap_no, NULL, index, thr);
+#else
err = lock_rec_enqueue_waiting(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
block, next_rec_heap_no, NULL, index, thr);
+#endif
trx_mutex_exit(trx);
} else {
@@ -6058,6 +6370,11 @@ lock_rec_convert_impl_to_expl(
implicit lock. Because cannot lock at this moment.*/
if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))
+#ifdef WITH_WSREP
+ && !wsrep_thd_is_brute_force(impl_trx->mysql_thd)
+ /* BF-BF conflict is possible if advancing into
+ lock_rec_other_has_conflicting*/
+#endif /* WITH_WSREP */
&& lock_rec_other_has_conflicting(
static_cast<enum lock_mode>
(LOCK_X | LOCK_REC_NOT_GAP), block,
diff --git a/storage/innobase/rem/rem0rec.cc b/storage/innobase/rem/rem0rec.cc
index 3a5d2f579c3..1d65016d455 100644
--- a/storage/innobase/rem/rem0rec.cc
+++ b/storage/innobase/rem/rem0rec.cc
@@ -33,6 +33,9 @@ Created 5/30/1994 Heikki Tuuri
#include "mtr0mtr.h"
#include "mtr0log.h"
#include "fts0fts.h"
+#ifdef WITH_WSREP
+#include <ha_prototypes.h>
+#endif /* WITH_WSREP */
/* PHYSICAL RECORD (OLD STYLE)
===========================
@@ -1953,3 +1956,133 @@ rec_get_trx_id(
}
# endif /* UNIV_DEBUG */
#endif /* !UNIV_HOTBACKUP */
+#ifdef WITH_WSREP
+int
+wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index in foreign table */
+ dict_index_t* index_ref, /* in: index in referenced table */
+ ibool new_protocol) /* in: protocol > 1 */
+{
+ const byte* data;
+ ulint len;
+ ulint key_len = 0;
+ ulint i;
+ uint key_parts;
+ mem_heap_t* heap = NULL;
+ ulint offsets_[REC_OFFS_NORMAL_SIZE];
+ const ulint* offsets;
+
+ ut_ad(index_for);
+ ut_ad(index_ref);
+
+ rec_offs_init(offsets_);
+ offsets = rec_get_offsets(rec, index_for, offsets_,
+ ULINT_UNDEFINED, &heap);
+
+ ut_ad(rec_offs_validate(rec, NULL, offsets));
+
+ ut_ad(rec);
+
+ key_parts = dict_index_get_n_unique_in_tree(index_for);
+ for (i = 0;
+ i < key_parts &&
+ (index_for->type & DICT_CLUSTERED || i < key_parts - 1);
+ i++) {
+ dict_field_t* field_f =
+ dict_index_get_nth_field(index_for, i);
+ const dict_col_t* col_f = dict_field_get_col(field_f);
+ dict_field_t* field_r =
+ dict_index_get_nth_field(index_ref, i);
+ const dict_col_t* col_r = dict_field_get_col(field_r);
+
+ data = rec_get_nth_field(rec, offsets, i, &len);
+ if (key_len + ((len != UNIV_SQL_NULL) ? len + 1 : 1) >
+ *buf_len) {
+ fprintf (stderr,
+ "WSREP: FK key len exceeded %lu %lu %lu\n",
+ key_len, len, *buf_len);
+ goto err_out;
+ }
+
+ if (len == UNIV_SQL_NULL) {
+ ut_a(!(col_f->prtype & DATA_NOT_NULL));
+ *buf++ = 1;
+ key_len++;
+ } else if (!new_protocol) {
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ memcpy(buf, data, len);
+ wsrep_innobase_mysql_sort(
+ (int)(col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)dtype_get_charset_coll(col_f->prtype),
+ buf, len);
+ } else { /* new protocol */
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ switch (col_f->mtype) {
+ case DATA_INT: {
+ byte* ptr = buf+len;
+ for (;;) {
+ ptr--;
+ *ptr = *data;
+ if (ptr == buf) {
+ break;
+ }
+ data++;
+ }
+
+ if (!(col_f->prtype & DATA_UNSIGNED)) {
+ buf[len-1] = (byte) (buf[len-1] ^ 128);
+ }
+
+ break;
+ }
+ case DATA_VARCHAR:
+ case DATA_VARMYSQL:
+ case DATA_CHAR:
+ case DATA_MYSQL:
+ /* Copy the actual data */
+ ut_memcpy(buf, data, len);
+ wsrep_innobase_mysql_sort(
+ (int)
+ (col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)
+ dtype_get_charset_coll(col_f->prtype),
+ buf, len);
+ break;
+ case DATA_BLOB:
+ case DATA_BINARY:
+ memcpy(buf, data, len);
+ break;
+ default:
+ break;
+ }
+
+ key_len += len;
+ buf += len;
+ }
+ }
+
+ rec_validate(rec, offsets);
+
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ *buf_len = key_len;
+ return DB_SUCCESS;
+
+ err_out:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+ return DB_ERROR;
+}
+#endif // WITH_WSREP
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
index c1c27152831..8ae1a711400 100644
--- a/storage/innobase/row/row0ins.cc
+++ b/storage/innobase/row/row0ins.cc
@@ -918,6 +918,14 @@ row_ins_invalidate_query_cache(
innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
mem_free(buf);
}
+#ifdef WITH_WSREP
+dberr_t wsrep_append_foreign_key(trx_t *trx,
+ dict_foreign_t* foreign,
+ const rec_t* clust_rec,
+ dict_index_t* clust_index,
+ ibool referenced,
+ ibool shared);
+#endif /* WITH_WSREP */
/*********************************************************************//**
Perform referential actions or checks when a parent row is deleted or updated
@@ -1272,6 +1280,18 @@ row_ins_foreign_check_on_constraint(
err = row_update_cascade_for_mysql(thr, cascade,
foreign->foreign_table);
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ clust_rec,
+ clust_index,
+ FALSE, FALSE);
+ if (err != DB_SUCCESS) {
+ fprintf(stderr,
+ "WSREP: foreign key append failed: %d\n", err);
+ } else
+#endif /* WITH_WSREP */
if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
fprintf(stderr,
"InnoDB: error: table %s has the counter 0"
@@ -1601,7 +1621,14 @@ run_again:
if (check_ref) {
err = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ rec,
+ check_index,
+ check_ref, TRUE);
+#endif /* WITH_WSREP */
goto end_scan;
} else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE
@@ -1893,6 +1920,9 @@ row_ins_scan_sec_index_for_duplicate(
mem_heap_t* offsets_heap)
/*!< in/out: memory heap that can be emptied */
{
+#ifdef WITH_WSREP
+ trx_t* trx = thr_get_trx(thr);
+#endif
ulint n_unique;
int cmp;
ulint n_fields_cmp;
@@ -1950,8 +1980,16 @@ row_ins_scan_sec_index_for_duplicate(
if (flags & BTR_NO_LOCKING_FLAG) {
/* Set no locks when applying log
in online table rebuild. */
+#ifdef WITH_WSREP
+ /* slave applier must not get duplicate error */
+ } else if (allow_duplicates ||
+ (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd))) {
+#else
} else if (allow_duplicates) {
+#endif
+
/* If the SQL-query will update or replace
duplicate key we will take X-lock for
duplicates ( REPLACE, LOAD DATAFILE REPLACE,
@@ -2145,7 +2183,13 @@ row_ins_duplicate_error_in_clust(
sure that in roll-forward we get the same duplicate
errors as in original execution */
+#ifdef WITH_WSREP
+ if (trx->duplicates ||
+ (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd))) {
+#else
if (trx->duplicates) {
+#endif
/* If the SQL-query will update or replace
duplicate key we will take X-lock for
@@ -2190,7 +2234,13 @@ duplicate:
offsets = rec_get_offsets(rec, cursor->index, offsets,
ULINT_UNDEFINED, &heap);
+#ifdef WITH_WSREP
+ if (trx->duplicates ||
+ (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd))) {
+#else
if (trx->duplicates) {
+#endif
/* If the SQL-query will update or replace
duplicate key we will take X-lock for
diff --git a/storage/innobase/row/row0upd.cc b/storage/innobase/row/row0upd.cc
index f97c0c3c82b..5d236cc40e5 100644
--- a/storage/innobase/row/row0upd.cc
+++ b/storage/innobase/row/row0upd.cc
@@ -51,6 +51,9 @@ Created 12/27/1996 Heikki Tuuri
#include "pars0sym.h"
#include "eval0eval.h"
#include "buf0lru.h"
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+#endif
/* What kind of latch and lock can we assume when the control comes to
@@ -170,6 +173,50 @@ func_exit:
return(is_referenced);
}
+#ifdef WITH_WSREP
+static
+ibool
+wsrep_row_upd_index_is_foreign(
+/*========================*/
+ dict_index_t* index, /*!< in: index */
+ trx_t* trx) /*!< in: transaction */
+{
+ dict_table_t* table = index->table;
+ dict_foreign_t* foreign;
+ ibool froze_data_dict = FALSE;
+ ibool is_referenced = FALSE;
+
+ if (!UT_LIST_GET_FIRST(table->foreign_list)) {
+
+ return(FALSE);
+ }
+
+ if (trx->dict_operation_lock_mode == 0) {
+ row_mysql_freeze_data_dictionary(trx);
+ froze_data_dict = TRUE;
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ if (foreign->foreign_index == index) {
+
+ is_referenced = TRUE;
+ goto func_exit;
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+func_exit:
+ if (froze_data_dict) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ return(is_referenced);
+}
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Checks if possible foreign key constraints hold after a delete of the record
under pcur.
@@ -287,7 +334,123 @@ run_again:
}
err = DB_SUCCESS;
+func_exit:
+ if (got_s_lock) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+ mem_heap_free(heap);
+
+ return(err);
+}
+#ifdef WITH_WSREP
+static
+dberr_t
+wsrep_row_upd_check_foreign_constraints(
+/*=================================*/
+ upd_node_t* node, /*!< in: row update node */
+ btr_pcur_t* pcur, /*!< in: cursor positioned on a record; NOTE: the
+ cursor position is lost in this function! */
+ dict_table_t* table, /*!< in: table in question */
+ dict_index_t* index, /*!< in: index of the cursor */
+ ulint* offsets,/*!< in/out: rec_get_offsets(pcur.rec, index) */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_foreign_t* foreign;
+ mem_heap_t* heap;
+ dtuple_t* entry;
+ trx_t* trx;
+ const rec_t* rec;
+ ulint n_ext;
+ dberr_t err;
+ ibool got_s_lock = FALSE;
+
+ if (UT_LIST_GET_FIRST(table->foreign_list) == NULL) {
+
+ return(DB_SUCCESS);
+ }
+
+ trx = thr_get_trx(thr);
+
+ rec = btr_pcur_get_rec(pcur);
+ ut_ad(rec_offs_validate(rec, index, offsets));
+
+ heap = mem_heap_create(500);
+
+ entry = row_rec_to_index_entry(rec, index, offsets,
+ &n_ext, heap);
+
+ mtr_commit(mtr);
+
+ mtr_start(mtr);
+
+ if (trx->dict_operation_lock_mode == 0) {
+ got_s_lock = TRUE;
+
+ row_mysql_freeze_data_dictionary(trx);
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ /* Note that we may have an update which updates the index
+ record, but does NOT update the first fields which are
+ referenced in a foreign key constraint. Then the update does
+ NOT break the constraint. */
+
+ if (foreign->foreign_index == index
+ && (node->is_delete
+ || row_upd_changes_first_fields_binary(
+ entry, index, node->update,
+ foreign->n_fields))) {
+
+ if (foreign->referenced_table == NULL) {
+ foreign->referenced_table =
+ dict_table_open_on_name(
+ foreign->referenced_table_name_lookup,
+ FALSE, FALSE, DICT_ERR_IGNORE_NONE);
+ }
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)++;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ /* NOTE that if the thread ends up waiting for a lock
+ we will release dict_operation_lock temporarily!
+ But the counter on the table protects 'foreign' from
+ being dropped while the check is running. */
+
+ err = row_ins_check_foreign_constraint(
+ TRUE, foreign, table, entry, thr);
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ ut_a(foreign->referenced_table
+ ->n_foreign_key_checks_running > 0);
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)--;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if (err != DB_SUCCESS) {
+
+ goto func_exit;
+ }
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+ err = DB_SUCCESS;
func_exit:
if (got_s_lock) {
row_mysql_unfreeze_data_dictionary(trx);
@@ -299,6 +462,7 @@ func_exit:
return(err);
}
+#endif /* WITH_WSREP */
/*********************************************************************//**
Creates an update node for a query graph.
@@ -1673,6 +1837,9 @@ row_upd_sec_index_entry(
index = node->index;
referenced = row_upd_index_is_referenced(index, trx);
+#ifdef WITH_WSREP
+ ibool foreign = wsrep_row_upd_index_is_foreign(index, trx);
+#endif /* WITH_WSREP */
heap = mem_heap_create(1024);
@@ -1800,6 +1967,9 @@ row_upd_sec_index_entry(
row_ins_sec_index_entry() below */
if (!rec_get_deleted_flag(
rec, dict_table_is_comp(index->table))) {
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
err = btr_cur_del_mark_set_sec_rec(
0, btr_cur, TRUE, thr, &mtr);
@@ -1817,6 +1987,37 @@ row_upd_sec_index_entry(
node, &pcur, index->table,
index, offsets, thr, &mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) ==
+ QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ ulint* offsets =
+ rec_get_offsets(
+ rec, index, NULL, ULINT_UNDEFINED,
+ &heap);
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, &pcur, index->table,
+ index, offsets, thr, &mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: sec index FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %d",
+ (int)err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
}
break;
}
@@ -1973,6 +2174,9 @@ row_upd_clust_rec_by_insert(
que_thr_t* thr, /*!< in: query thread */
ibool referenced,/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign, /*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in/out: mtr; gets committed here */
{
mem_heap_t* heap;
@@ -1986,6 +2190,9 @@ row_upd_clust_rec_by_insert(
rec_t* rec;
ulint* offsets = NULL;
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -2062,6 +2269,34 @@ err_exit:
goto err_exit;
}
}
+#ifdef WITH_WSREP
+ if (!referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: insert FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %d",
+ (int)err);
+ break;
+ }
+ if (err != DB_SUCCESS) {
+ goto err_exit;
+ }
+ }
+#endif /* WITH_WSREP */
}
mtr_commit(mtr);
@@ -2289,11 +2524,18 @@ row_upd_del_mark_clust_rec(
ibool referenced,
/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign,/*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr; gets committed here */
{
btr_pcur_t* pcur;
btr_cur_t* btr_cur;
dberr_t err;
+#ifdef WITH_WSREP
+ rec_t* rec;
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -2310,8 +2552,16 @@ row_upd_del_mark_clust_rec(
/* Mark the clustered index record deleted; we do not have to check
locks, because we assume that we have an x-lock on the record */
+#ifdef WITH_WSREP
+ rec = btr_cur_get_rec(btr_cur);
+#endif /* WITH_WSREP */
+
err = btr_cur_del_mark_set_clust_rec(
+#ifdef WITH_WSREP
+ btr_cur_get_block(btr_cur), rec,
+#else
btr_cur_get_block(btr_cur), btr_cur_get_rec(btr_cur),
+#endif /* WITH_WSREP */
index, offsets, thr, mtr);
if (err == DB_SUCCESS && referenced) {
/* NOTE that the following call loses the position of pcur ! */
@@ -2319,6 +2569,32 @@ row_upd_del_mark_clust_rec(
err = row_upd_check_references_constraints(
node, pcur, index->table, index, offsets, thr, mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ thr_get_trx(thr) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, index->table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: clust rec FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: clust rec referenced FK check fail: %d",
+ (int)err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
mtr_commit(mtr);
@@ -2351,6 +2627,10 @@ row_upd_clust_step(
index = dict_table_get_first_index(node->table);
referenced = row_upd_index_is_referenced(index, thr_get_trx(thr));
+#ifdef WITH_WSREP
+ ibool foreign = wsrep_row_upd_index_is_foreign(
+ index, thr_get_trx(thr));
+#endif /* WITH_WSREP */
pcur = node->pcur;
@@ -2441,7 +2721,11 @@ row_upd_clust_step(
if (node->is_delete) {
err = row_upd_del_mark_clust_rec(
+#ifdef WITH_WSREP
+ node, index, offsets, thr, referenced, foreign, &mtr);
+#else
node, index, offsets, thr, referenced, &mtr);
+#endif /* WITH_WSREP */
if (err == DB_SUCCESS) {
node->state = UPD_NODE_UPDATE_ALL_SEC;
@@ -2486,7 +2770,11 @@ row_upd_clust_step(
externally! */
err = row_upd_clust_rec_by_insert(
+#ifdef WITH_WSREP
+ node, index, thr, referenced, foreign, &mtr);
+#else
node, index, thr, referenced, &mtr);
+#endif /* WITH_WSREP */
if (err != DB_SUCCESS) {
diff --git a/storage/innobase/srv/srv0conc.cc b/storage/innobase/srv/srv0conc.cc
index 820700a95a8..5affd671fbe 100644
--- a/storage/innobase/srv/srv0conc.cc
+++ b/storage/innobase/srv/srv0conc.cc
@@ -42,6 +42,10 @@ Created 2011/04/18 Sunny Bains
#include "trx0trx.h"
#include "mysql/plugin.h"
+#ifdef WITH_WSREP
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+extern my_bool wsrep_debug;
+#endif
/** Number of times a thread is allowed to enter InnoDB within the same
SQL query after it has once got the ticket. */
@@ -86,6 +90,9 @@ struct srv_conc_slot_t{
reserved may still be TRUE at that
point */
srv_conc_node_t srv_conc_queue; /*!< queue node */
+#ifdef WITH_WSREP
+ void *thd; /*!< to see priority */
+#endif
};
/** Queue of threads waiting to get in */
@@ -145,6 +152,9 @@ srv_conc_init(void)
conc_slot->event = os_event_create();
ut_a(conc_slot->event);
+#ifdef WITH_WSREP
+ conc_slot->thd = NULL;
+#endif /* WITH_WSREP */
}
#endif /* !HAVE_ATOMIC_BUILTINS */
}
@@ -202,6 +212,16 @@ srv_conc_enter_innodb_with_atomics(
for (;;) {
ulint sleep_in_us;
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ if (wsrep_debug)
+ fprintf(stderr,
+ "srv_conc_enter due to MUST_ABORT");
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+#endif /* WITH_WSREP */
if (srv_conc.n_active < (lint) srv_thread_concurrency) {
ulint n_active;
@@ -319,6 +339,9 @@ srv_conc_exit_innodb_without_atomics(
slot = NULL;
if (srv_conc.n_active < (lint) srv_thread_concurrency) {
+#ifdef WITH_WSREP
+ srv_conc_slot_t* wsrep_slot;
+#endif
/* Look for a slot where a thread is waiting and no other
thread has yet released the thread */
@@ -329,6 +352,19 @@ srv_conc_exit_innodb_without_atomics(
/* No op */
}
+#ifdef WITH_WSREP
+ /* look for aborting trx, they must be released asap */
+ wsrep_slot= slot;
+ while (wsrep_slot && (wsrep_slot->wait_ended == TRUE ||
+ !wsrep_trx_is_aborting(wsrep_slot->thd))) {
+ wsrep_slot = UT_LIST_GET_NEXT(srv_conc_queue, wsrep_slot);
+ }
+ if (wsrep_slot) {
+ slot = wsrep_slot;
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: releasing aborting thd\n");
+ }
+#endif
if (slot != NULL) {
slot->wait_ended = TRUE;
@@ -384,6 +420,13 @@ retry:
return;
}
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd)) {
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+#endif
/* If the transaction is not holding resources, let it sleep
for srv_thread_sleep_delay microseconds, and try again then */
@@ -450,6 +493,9 @@ retry:
/* Add to the queue */
slot->reserved = TRUE;
slot->wait_ended = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = trx->mysql_thd;
+#endif
UT_LIST_ADD_LAST(srv_conc_queue, srv_conc_queue, slot);
@@ -457,6 +503,18 @@ retry:
srv_conc.n_waiting++;
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ os_fast_mutex_unlock(&srv_conc_mutex);
+ if (wsrep_debug)
+ fprintf(stderr, "srv_conc_enter due to MUST_ABORT");
+ trx->declared_to_be_inside_innodb = TRUE;
+ trx->n_tickets_to_enter_innodb = srv_n_free_tickets_to_enter;
+ return;
+ }
+ trx->wsrep_event = slot->event;
+#endif /* WITH_WSREP */
os_fast_mutex_unlock(&srv_conc_mutex);
/* Go to wait for the event; when a thread leaves InnoDB it will
@@ -472,6 +530,9 @@ retry:
os_event_wait(slot->event);
thd_wait_end(trx->mysql_thd);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
trx->op_info = "";
@@ -483,6 +544,9 @@ retry:
incremented the thread counter on behalf of this thread */
slot->reserved = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = NULL;
+#endif
UT_LIST_REMOVE(srv_conc_queue, srv_conc_queue, slot);
@@ -593,5 +657,32 @@ srv_conc_get_active_threads(void)
/*==============================*/
{
return(srv_conc.n_active);
- }
+}
+
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx) /*!< in: transaction object associated with the
+ thread */
+{
+#ifdef HAVE_ATOMIC_BUILTINS
+ /* aborting transactions will enter innodb by force in
+ srv_conc_enter_innodb_with_atomics(). No need to cancel here,
+ thr will wake up after os_sleep and let to enter innodb
+ */
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel, no atomics\n");
+#else
+ os_fast_mutex_lock(&srv_conc_mutex);
+ if (trx->wsrep_event) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel\n");
+ os_event_set(trx->wsrep_event);
+ }
+ os_fast_mutex_unlock(&srv_conc_mutex);
+#endif
+}
+#endif /* WITH_WSREP */
diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc
index 4c5753ac40e..d4d24e53df1 100644
--- a/storage/innobase/srv/srv0srv.cc
+++ b/storage/innobase/srv/srv0srv.cc
@@ -71,6 +71,10 @@ Created 10/8/1995 Heikki Tuuri
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
+#ifdef WITH_WSREP
+extern int wsrep_debug;
+extern int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
/* The following is the maximum allowed duration of a lock wait. */
UNIV_INTERN ulint srv_fatal_semaphore_wait_threshold = 600;
@@ -207,6 +211,10 @@ srv_printf_innodb_monitor() will request mutex acquisition
with mutex_enter(), which will wait until it gets the mutex. */
#define MUTEX_NOWAIT(mutex_skipped) ((mutex_skipped) < MAX_MUTEX_NOWAIT)
+#ifdef WITH_INNODB_DISALLOW_WRITES
+UNIV_INTERN os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/** The sort order table of the MySQL latin1_swedish_ci character set
collation */
UNIV_INTERN const byte* srv_latin1_ordering;
@@ -979,6 +987,14 @@ srv_init(void)
dict_ind_init();
srv_conc_init();
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ /* Writes have to be enabled on init or else we hang. Thus, we
+ always set the event here regardless of innobase_disallow_writes.
+ That flag will always be 0 at this point because it isn't settable
+ via my.cnf or command line arg. */
+ srv_allow_writes_event = os_event_create();
+ os_event_set(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
/* Initialize some INFORMATION SCHEMA internal structures */
trx_i_s_cache_init(trx_i_s_cache);
@@ -1730,7 +1746,20 @@ loop:
if (sync_array_print_long_waits(&waiter, &sema)
&& sema == old_sema && os_thread_eq(waiter, old_waiter)) {
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ if (srv_allow_writes_event->is_set) {
+#endif /* WITH_WSREP */
fatal_cnt++;
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ } else {
+ fprintf(stderr,
+ "WSREP: avoiding InnoDB self crash due to long "
+ "semaphore wait of > %lu seconds\n"
+ "Server is processing SST donor operation, "
+ "fatal_cnt now: %lu",
+ (ulong) srv_fatal_semaphore_wait_threshold, fatal_cnt);
+ }
+#endif /* WITH_WSREP */
if (fatal_cnt > 10) {
fprintf(stderr,
@@ -2290,6 +2319,27 @@ srv_master_sleep(void)
srv_main_thread_op_info = "";
}
+#ifdef WITH_WSREP
+/*********************************************************************//**
+check if lock timeout was for priority thread,
+as a side effect trigger lock monitor
+@return false for regular lock timeout */
+static ibool
+wsrep_is_BF_lock_timeout(
+/*====================*/
+ srv_slot_t* slot) /* in: lock slot to check for lock priority */
+{
+ if (wsrep_on(thr_get_trx(slot->thr)->mysql_thd) &&
+ wsrep_thd_is_brute_force((thr_get_trx(slot->thr))->mysql_thd)) {
+ fprintf(stderr, "WSREP: BF lock wait long\n");
+ srv_print_innodb_monitor = TRUE;
+ srv_print_innodb_lock_monitor = TRUE;
+ os_event_set(lock_sys->timeout_event);
+ return TRUE;
+ }
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
/*********************************************************************//**
The master thread controlling the server.
@return a dummy parameter */
@@ -2370,6 +2420,27 @@ suspend_thread:
OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */
}
+#ifdef WITH_WSREP_TODO
+/*********************************************************************//**
+check if lock timeout was for priority thread,
+as a side effect trigger lock monitor
+@return false for regular lock timeout */
+static ibool
+wsrep_is_BF_lock_timeout(
+/*====================*/
+ srv_slot_t* slot) /* in: lock slot to check for lock priority */
+{
+ if (wsrep_on(thr_get_trx(slot->thr)->mysql_thd) &&
+ wsrep_thd_is_brute_force((thr_get_trx(slot->thr))->mysql_thd)) {
+ fprintf(stderr, "WSREP: BF lock wait long\n");
+ srv_print_innodb_monitor = TRUE;
+ srv_print_innodb_lock_monitor = TRUE;
+ os_event_set(lock_sys->timeout_event);
+ return TRUE;
+ }
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
/*********************************************************************//**
Check if purge should stop.
@return true if it should shutdown. */
diff --git a/storage/innobase/trx/trx0roll.cc b/storage/innobase/trx/trx0roll.cc
index d07e40c506d..a015d94f660 100644
--- a/storage/innobase/trx/trx0roll.cc
+++ b/storage/innobase/trx/trx0roll.cc
@@ -45,6 +45,9 @@ Created 3/26/1996 Heikki Tuuri
#include "pars0pars.h"
#include "srv0mon.h"
#include "trx0sys.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h"
+#endif /* WITH_WSREP */
/** This many pages must be undone before a truncate is tried within
rollback */
@@ -382,6 +385,13 @@ trx_rollback_to_savepoint_for_mysql_low(
trx->op_info = "";
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->lock.was_chosen_as_deadlock_victim) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
+
return(err);
}
@@ -1018,6 +1028,12 @@ trx_roll_try_truncate(
if (trx->update_undo) {
trx_undo_truncate_end(trx, trx->update_undo, limit);
}
+
+#ifdef WITH_WSREP_OUT
+ if (wsrep_on(trx->mysql_thd)) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif /* WITH_WSREP */
}
/***********************************************************************//**
diff --git a/storage/innobase/trx/trx0sys.cc b/storage/innobase/trx/trx0sys.cc
index 7c2bbc90ad9..0b2837a09b5 100644
--- a/storage/innobase/trx/trx0sys.cc
+++ b/storage/innobase/trx/trx0sys.cc
@@ -44,6 +44,10 @@ Created 3/26/1996 Heikki Tuuri
#include "os0file.h"
#include "read0read.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h" /* wsrep_is_wsrep_xid() */
+#endif /* */
+
/** The file format tag structure with id and name. */
struct file_format_t {
ulint id; /*!< id of the file format */
@@ -202,9 +206,14 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr */
{
+#ifndef WITH_WSREP
trx_sysf_t* sys_header;
+#endif /* !WITH_WSREP */
if (ut_strlen(file_name) >= TRX_SYS_MYSQL_LOG_NAME_LEN) {
@@ -213,7 +222,9 @@ trx_sys_update_mysql_binlog_offset(
return;
}
+#ifndef WITH_WSREP
sys_header = trx_sysf_get(mtr);
+#endif /* !WITH_WSREP */
if (mach_read_from_4(sys_header + field
+ TRX_SYS_MYSQL_LOG_MAGIC_N_FLD)
@@ -300,6 +311,124 @@ trx_sys_print_mysql_binlog_offset(void)
mtr_commit(&mtr);
}
+#ifdef WITH_WSREP
+
+#ifdef UNIV_DEBUG
+static long long trx_sys_cur_xid_seqno = -1;
+static unsigned char trx_sys_cur_xid_uuid[16];
+
+long long read_wsrep_xid_seqno(const XID* xid)
+{
+ long long seqno;
+ memcpy(&seqno, xid->data + 24, sizeof(long long));
+ return seqno;
+}
+
+void read_wsrep_xid_uuid(const XID* xid, unsigned char* buf)
+{
+ memcpy(buf, xid->data + 8, 16);
+}
+
+#endif /* UNIV_DEBUG */
+
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: transaction XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr) /*!< in: mtr */
+{
+#ifdef UNIV_DEBUG
+ {
+ /* Check that seqno is monotonically increasing */
+ unsigned char xid_uuid[16];
+ long long xid_seqno = read_wsrep_xid_seqno(xid);
+ read_wsrep_xid_uuid(xid, xid_uuid);
+ if (!memcmp(xid_uuid, trx_sys_cur_xid_uuid, 8))
+ {
+ ut_ad(xid_seqno > trx_sys_cur_xid_seqno);
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+ else
+ {
+ memcpy(trx_sys_cur_xid_uuid, xid_uuid, 16);
+ }
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+#endif /* UNIV_DEBUG */
+
+ ut_ad(xid && mtr);
+ ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid));
+
+ if (mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD)
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD,
+ TRX_SYS_WSREP_XID_MAGIC_N,
+ MLOG_4BYTES, mtr);
+ }
+
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_FORMAT,
+ (int)xid->formatID,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_GTRID_LEN,
+ (int)xid->gtrid_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_BQUAL_LEN,
+ (int)xid->bqual_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_string(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_DATA,
+ (const unsigned char*) xid->data,
+ XIDDATASIZE, mtr);
+
+}
+
+void
+trx_sys_read_wsrep_checkpoint(XID* xid)
+/*===================================*/
+{
+ trx_sysf_t* sys_header;
+ mtr_t mtr;
+ ulint magic;
+
+ ut_ad(xid);
+
+ mtr_start(&mtr);
+
+ sys_header = trx_sysf_get(&mtr);
+
+ if ((magic = mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD))
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ memset(xid, 0, sizeof(*xid));
+ xid->formatID = -1;
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ return;
+ }
+
+ xid->formatID = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_FORMAT);
+ xid->gtrid_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_GTRID_LEN);
+ xid->bqual_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_BQUAL_LEN);
+ ut_memcpy(xid->data,
+ sys_header + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_DATA,
+ XIDDATASIZE);
+
+ mtr_commit(&mtr);
+}
+
+#endif /* WITH_WSREP */
+
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header if
the magic number shows it valid. */
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 2dfa78b229a..fe71fa21a74 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -147,6 +147,9 @@ trx_create(void)
trx->lock.table_locks = ib_vector_create(
heap_alloc, sizeof(void**), 32);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
return(trx);
}
@@ -722,6 +725,11 @@ trx_start_low(
srv_undo_logs, srv_undo_tablespaces);
}
+#ifdef WITH_WSREP
+ memset(&trx->xid, 0, sizeof(trx->xid));
+ trx->xid.formatID = -1;
+#endif /* WITH_WSREP */
+
/* The initial value for trx->no: IB_ULONGLONG_MAX is used in
read_view_open_now: */
@@ -832,7 +840,9 @@ trx_write_serialisation_history(
/*============================*/
trx_t* trx) /*!< in: transaction */
{
-
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header;
+#endif /* WITH_WSREP */
mtr_t mtr;
trx_rseg_t* rseg;
@@ -882,6 +892,15 @@ trx_write_serialisation_history(
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
+#ifdef WITH_WSREP
+ sys_header = trx_sysf_get(&mtr);
+ /* Update latest MySQL wsrep XID in trx sys header. */
+ if (wsrep_is_wsrep_xid(&trx->xid))
+ {
+ trx_sys_update_wsrep_checkpoint(&trx->xid, sys_header, &mtr);
+ }
+#endif /* WITH_WSREP */
+
/* Update the latest MySQL binlog name and offset info
in trx sys header if MySQL binlogging is on or the database
server is a MySQL replication slave */
@@ -892,7 +911,11 @@ trx_write_serialisation_history(
trx_sys_update_mysql_binlog_offset(
trx->mysql_log_file_name,
trx->mysql_log_offset,
- TRX_SYS_MYSQL_LOG_INFO, &mtr);
+ TRX_SYS_MYSQL_LOG_INFO,
+#ifdef WITH_WSREP
+ sys_header,
+#endif /* WITH_WSREP */
+ &mtr);
trx->mysql_log_file_name = NULL;
}
@@ -1235,6 +1258,11 @@ trx_commit(
ut_ad(!trx->in_ro_trx_list);
ut_ad(!trx->in_rw_trx_list);
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd)) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
trx->dict_operation = TRX_DICT_OP_NONE;
trx->error_state = DB_SUCCESS;
@@ -1329,6 +1357,10 @@ trx_commit_or_rollback_prepare(
switch (trx->state) {
case TRX_STATE_NOT_STARTED:
+#ifdef WITH_WSREP
+ ut_d(trx->start_file = __FILE__);
+ ut_d(trx->start_line = __LINE__);
+#endif /* WITH_WSREP */
trx_start_low(trx);
/* fall through */
case TRX_STATE_ACTIVE:
@@ -2054,7 +2086,7 @@ trx_start_if_not_started_xa_low(
transaction, doesn't. */
trx->support_xa = thd_supports_xa(trx->mysql_thd);
- trx_start_low(trx);
+ trx_start_if_not_started(trx);
/* fall through */
case TRX_STATE_ACTIVE:
return;
diff --git a/storage/xtradb/dict/dict0dict.cc b/storage/xtradb/dict/dict0dict.cc
index a20456fe3cf..7592a7608df 100644
--- a/storage/xtradb/dict/dict0dict.cc
+++ b/storage/xtradb/dict/dict0dict.cc
@@ -3254,7 +3254,29 @@ dict_foreign_find_index(
return(NULL);
}
-
+#ifdef WITH_WSREP
+dict_index_t*
+wsrep_dict_foreign_find_index(
+/*====================*/
+ dict_table_t* table, /*!< in: table */
+ const char** col_names, /*!< in: column names, or NULL
+ to use table->col_names */
+ const char** columns,/*!< in: array of column names */
+ ulint n_cols, /*!< in: number of columns */
+ dict_index_t* types_idx, /*!< in: NULL or an index to whose types the
+ column types must match */
+ ibool check_charsets,
+ /*!< in: whether to check charsets.
+ only has an effect if types_idx != NULL */
+ ulint check_null)
+ /*!< in: nonzero if none of the columns must
+ be declared NOT NULL */
+{
+ return dict_foreign_find_index(
+ table, col_names, columns, n_cols, types_idx, check_charsets,
+ check_null);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Report an error in a foreign key definition. */
static
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index 43cfa23a99f..26190d56044 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -101,6 +101,63 @@ this program; if not, write to the Free Software Foundation, Inc.,
#endif /* UNIV_DEBUG */
#include "fts0priv.h"
#include "page0zip.h"
+#ifdef WITH_WSREP
+#include "dict0priv.h"
+#include "../storage/innobase/include/ut0byte.h"
+#include <wsrep_mysqld.h>
+#include <my_md5.h>
+#if defined(HAVE_YASSL)
+#include "my_config.h"
+#include "md5.hpp"
+#elif defined(HAVE_OPENSSL)
+#include <openssl/md5.h>
+#endif
+
+extern my_bool wsrep_certify_nonPK;
+class binlog_trx_data;
+extern handlerton *binlog_hton;
+
+extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd;
+
+static inline wsrep_ws_handle_t*
+wsrep_ws_handle(THD* thd, const trx_t* trx) {
+ return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
+ (wsrep_trx_id_t)trx->id);
+}
+
+extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_buf_t* key,
+ size_t* key_len);
+
+extern handlerton * wsrep_hton;
+extern TC_LOG* tc_log;
+extern void wsrep_cleanup_transaction(THD *thd);
+
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal);
+static void
+wsrep_fake_trx_id(handlerton* hton, THD *thd);
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid);
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid);
+
+#ifdef WITH_WSREP
+dict_index_t*
+wsrep_dict_foreign_find_index(
+ dict_table_t* table,
+ const char** col_names,
+ const char** columns,
+ ulint n_cols,
+ dict_index_t* types_idx,
+ ibool check_charsets,
+ ulint check_null);
+
+#endif /* WITH_WSREP */
#define thd_get_trx_isolation(X) ((enum_tx_isolation)thd_tx_isolation(X))
@@ -1395,6 +1452,11 @@ innobase_srv_conc_enter_innodb(
/*===========================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd)) return;
+#endif /* WITH_WSREP */
+
if (srv_thread_concurrency) {
if (trx->n_tickets_to_enter_innodb > 0) {
@@ -1426,6 +1488,11 @@ innobase_srv_conc_exit_innodb(
/*==========================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd)) return;
+#endif /* WITH_WSREP */
+
#ifdef UNIV_SYNC_DEBUG
ut_ad(!sync_thread_levels_nonempty_trx(trx->has_search_latch));
#endif /* UNIV_SYNC_DEBUG */
@@ -1561,6 +1628,16 @@ thd_to_trx(
return(*(trx_t**) thd_ha_data(thd, innodb_hton_ptr));
}
+#ifdef WITH_WSREP
+ulonglong
+thd_to_trx_id(
+/*=======*/
+ THD* thd) /*!< in: MySQL thread */
+{
+ return(thd_to_trx(thd)->id);
+}
+#endif /* WITH_WSREP */
+
my_bool
ha_innobase::is_fake_change_enabled(THD* thd)
{
@@ -3156,6 +3233,13 @@ innobase_init(
if (srv_file_per_table)
innobase_hton->tablefile_extensions = ha_innobase_exts;
+#ifdef WITH_WSREP
+ innobase_hton->wsrep_abort_transaction=wsrep_abort_transaction;
+ innobase_hton->wsrep_set_checkpoint=innobase_wsrep_set_checkpoint;
+ innobase_hton->wsrep_get_checkpoint=innobase_wsrep_get_checkpoint;
+ innobase_hton->wsrep_fake_trx_id=wsrep_fake_trx_id;
+#endif /* WITH_WSREP */
+
ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR);
#ifndef DBUG_OFF
@@ -3842,10 +3926,32 @@ innobase_commit_low(
/*================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ THD* thd = (THD*)trx->mysql_thd;
+ const char* tmp = 0;
+ if (wsrep_on((void*)thd)) {
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1,
+ "innobase_commit_low():trx_commit_for_mysql(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ tmp = thd_proc_info(thd, info);
+
+#else
+ tmp = thd_proc_info(thd, "innobase_commit_low()");
+#endif /* WSREP_PROC_INFO */
+ }
+#endif /* WITH_WSREP */
+
if (trx_is_started(trx)) {
trx_commit_for_mysql(trx);
}
+
+#ifdef WITH_WSREP
+ if (wsrep_on((void*)thd)) { thd_proc_info(thd, tmp); }
+#endif /* WITH_WSREP */
}
/*****************************************************************//**
@@ -4588,22 +4694,32 @@ innobase_kill_connection(
DBUG_ENTER("innobase_kill_connection");
DBUG_ASSERT(hton == innodb_hton_ptr);
- lock_mutex_enter();
-
- trx = thd_to_trx(thd);
-
- if (trx)
- {
- trx_mutex_enter(trx);
-
- /* Cancel a pending lock request. */
- if (trx->lock.wait_lock)
- lock_cancel_waiting_and_release(trx->lock.wait_lock);
-
- trx_mutex_exit(trx);
+#ifdef WITH_WSREP
+ wsrep_thd_LOCK(thd);
+ if (wsrep_thd_conflict_state(thd) != NO_CONFLICT) {
+ /* if victim has been signaled by BF thread and/or aborting
+ is already progressing, following query aborting is not necessary
+ any more.
+ Also, BF thread should own trx mutex for the victim, which would
+ conflict with trx_mutex_enter() below
+ */
+ wsrep_thd_UNLOCK(thd);
+ DBUG_VOID_RETURN;
}
+ wsrep_thd_UNLOCK(thd);
+#endif /* WITH_WSREP */
+ trx = thd_to_trx(thd);
- lock_mutex_exit();
+ if (trx)
+ {
+ /* Cancel a pending lock request. */
+ lock_mutex_enter();
+ trx_mutex_enter(trx);
+ if (trx->lock.wait_lock)
+ lock_cancel_waiting_and_release(trx->lock.wait_lock);
+ trx_mutex_exit(trx);
+ lock_mutex_exit();
+ }
DBUG_VOID_RETURN;
}
@@ -4713,7 +4829,11 @@ ha_innobase::max_supported_key_length() const
case 8192:
return(1536);
default:
+#ifdef WITH_WSREP
+ return(3500);
+#else
return(3500);
+#endif
}
}
@@ -5829,6 +5949,97 @@ get_field_offset(
return((uint) (field->ptr - table->record[0]));
}
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_innobase_mysql_sort(
+/*===============*/
+ /* out: str contains sort string */
+ int mysql_type, /* in: MySQL type */
+ uint charset_number, /* in: number of the charset */
+ unsigned char* str, /* in: data field */
+ unsigned int str_length) /* in: data field length,
+ not UNIV_SQL_NULL */
+{
+ CHARSET_INFO* charset;
+ enum_field_types mysql_tp;
+
+ DBUG_ASSERT(str_length != UNIV_SQL_NULL);
+
+ mysql_tp = (enum_field_types) mysql_type;
+
+ switch (mysql_tp) {
+
+ case MYSQL_TYPE_BIT:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_VARCHAR:
+ {
+ uchar tmp_str[REC_VERSION_56_MAX_INDEX_COL_LEN];
+ uint tmp_length = REC_VERSION_56_MAX_INDEX_COL_LEN;
+
+ /* Use the charset number to pick the right charset struct for
+ the comparison. Since the MySQL function get_charset may be
+ slow before Bar removes the mutex operation there, we first
+ look at 2 common charsets directly. */
+
+ if (charset_number == default_charset_info->number) {
+ charset = default_charset_info;
+ } else if (charset_number == my_charset_latin1.number) {
+ charset = &my_charset_latin1;
+ } else {
+ charset = get_charset(charset_number, MYF(MY_WME));
+
+ if (charset == NULL) {
+ sql_print_error("InnoDB needs charset %lu for doing "
+ "a comparison, but MySQL cannot "
+ "find that charset.",
+ (ulong) charset_number);
+ ut_a(0);
+ }
+ }
+
+ ut_a(str_length <= tmp_length);
+ memcpy(tmp_str, str, str_length);
+
+ tmp_length = charset->coll->strnxfrm(charset, str, str_length,
+ str_length, tmp_str, tmp_length, 0);
+ DBUG_ASSERT(tmp_length == str_length);
+
+ break;
+ }
+ case MYSQL_TYPE_DECIMAL :
+ case MYSQL_TYPE_TINY :
+ case MYSQL_TYPE_SHORT :
+ case MYSQL_TYPE_LONG :
+ case MYSQL_TYPE_FLOAT :
+ case MYSQL_TYPE_DOUBLE :
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_TIMESTAMP :
+ case MYSQL_TYPE_LONGLONG :
+ case MYSQL_TYPE_INT24 :
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_DATETIME :
+ case MYSQL_TYPE_YEAR :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_NEWDECIMAL :
+ case MYSQL_TYPE_ENUM :
+ case MYSQL_TYPE_SET :
+ case MYSQL_TYPE_GEOMETRY :
+ break;
+ default:
+ break;
+ }
+
+ return;
+}
+#endif /* WITH_WSREP */
+
/*************************************************************//**
InnoDB uses this function to compare two data fields for which the data type
is such that we must use MySQL code to compare them. NOTE that the prototype
@@ -6338,6 +6549,268 @@ innobase_read_from_2_little_endian(
return((uint) ((ulint)(buf[0]) + 256 * ((ulint)(buf[1]))));
}
+#ifdef WITH_WSREP
+/*******************************************************************//**
+Stores a key value for a row to a buffer.
+@return key value length as stored in buff */
+UNIV_INTERN
+uint
+wsrep_store_key_val_for_row(
+/*===============================*/
+ TABLE* table,
+ uint keynr, /*!< in: key number */
+ char* buff, /*!< in/out: buffer for the key value (in MySQL
+ format) */
+ uint buff_len,/*!< in: buffer length */
+ const uchar* record,
+ ibool* key_is_null)/*!< out: full key was null */
+{
+ KEY* key_info = table->key_info + keynr;
+ KEY_PART_INFO* key_part = key_info->key_part;
+ KEY_PART_INFO* end =
+ key_part + key_info->user_defined_key_parts;
+ char* buff_start = buff;
+ enum_field_types mysql_type;
+ Field* field;
+
+ DBUG_ENTER("wsrep_store_key_val_for_row");
+
+ memset(buff, 0, buff_len);
+ *key_is_null = TRUE;
+
+ for (; key_part != end; key_part++) {
+
+ uchar sorted[REC_VERSION_56_MAX_INDEX_COL_LEN] = {'\0'};
+ ibool part_is_null = FALSE;
+
+ if (key_part->null_bit) {
+ if (record[key_part->null_offset]
+ & key_part->null_bit) {
+ *buff = 1;
+ part_is_null = TRUE;
+ } else {
+ *buff = 0;
+ }
+ buff++;
+ }
+ if (!part_is_null) *key_is_null = FALSE;
+
+ field = key_part->field;
+ mysql_type = field->type();
+
+ if (mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* >= 5.0.3 true VARCHAR */
+ ulint lenlen;
+ ulint len;
+ const byte* data;
+ ulint key_len;
+ ulint true_len;
+ const CHARSET_INFO* cs;
+ int error=0;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+ cs = field->charset();
+
+ lenlen = (ulint)
+ (((Field_varstring*)field)->length_bytes);
+
+ data = row_mysql_read_true_varchar(&len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ lenlen);
+
+ true_len = len;
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) data,
+ (const char *) data + len,
+ (uint) (key_len / cs->mbmaxlen),
+ &error);
+ }
+
+ /* In a column prefix index, we may need to truncate
+ the stored value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, data, true_len);
+ wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len);
+
+ if (wsrep_protocol_version > 1) {
+ memcpy(buff, sorted, true_len);
+ /* Note that we always reserve the maximum possible
+ length of the true VARCHAR in the key value, though
+ only len first bytes after the 2 length bytes contain
+ actual data. The rest of the space was reset to zero
+ in the bzero() call above. */
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else if (mysql_type == MYSQL_TYPE_TINY_BLOB
+ || mysql_type == MYSQL_TYPE_MEDIUM_BLOB
+ || mysql_type == MYSQL_TYPE_BLOB
+ || mysql_type == MYSQL_TYPE_LONG_BLOB
+ /* MYSQL_TYPE_GEOMETRY data is treated
+ as BLOB data in innodb. */
+ || mysql_type == MYSQL_TYPE_GEOMETRY) {
+
+ const CHARSET_INFO* cs;
+ ulint key_len;
+ ulint true_len;
+ int error=0;
+ ulint blob_len;
+ const byte* blob_data;
+
+ ut_a(key_part->key_part_flag & HA_PART_KEY_SEG);
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+
+ cs = field->charset();
+
+ blob_data = row_mysql_read_blob_ref(&blob_len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ (ulint) field->pack_length());
+
+ true_len = blob_len;
+
+ ut_a(get_field_offset(table, field)
+ == key_part->offset);
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (blob_len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) blob_data,
+ (const char *) blob_data
+ + blob_len,
+ (uint) (key_len / cs->mbmaxlen),
+ &error);
+ }
+
+ /* All indexes on BLOB and TEXT are column prefix
+ indexes, and we may need to truncate the data to be
+ stored in the key value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, blob_data, true_len);
+ wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len);
+
+ memcpy(buff, sorted, true_len);
+
+ /* Note that we always reserve the maximum possible
+ length of the BLOB prefix in the key value. */
+ if (wsrep_protocol_version > 1) {
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else {
+ /* Here we handle all other data types except the
+ true VARCHAR, BLOB and TEXT. Note that the column
+ value we store may be also in a column prefix
+ index. */
+
+ const CHARSET_INFO* cs = NULL;
+ ulint true_len;
+ ulint key_len;
+ const uchar* src_start;
+ int error=0;
+ enum_field_types real_type;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len;
+
+ continue;
+ }
+
+ src_start = record + key_part->offset;
+ real_type = field->real_type();
+ true_len = key_len;
+
+ /* Character set for the field is defined only
+ to fields whose type is string and real field
+ type is not enum or set. For these fields check
+ if character set is multi byte. */
+
+ if (real_type != MYSQL_TYPE_ENUM
+ && real_type != MYSQL_TYPE_SET
+ && ( mysql_type == MYSQL_TYPE_VAR_STRING
+ || mysql_type == MYSQL_TYPE_STRING)) {
+
+ cs = field->charset();
+
+ /* For multi byte character sets we need to
+ calculate the true length of the key */
+
+ if (key_len > 0 && cs->mbmaxlen > 1) {
+
+ true_len = (ulint)
+ cs->cset->well_formed_len(cs,
+ (const char *)src_start,
+ (const char *)src_start
+ + key_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+ memcpy(sorted, src_start, true_len);
+ wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len);
+ memcpy(buff, sorted, true_len);
+ } else {
+ memcpy(buff, src_start, true_len);
+ }
+ buff += true_len;
+
+ /* Pad the unused space with spaces. */
+
+#ifdef REMOVED
+ if (true_len < key_len) {
+ ulint pad_len = key_len - true_len;
+ ut_a(!(pad_len % cs->mbminlen));
+
+ cs->cset->fill(cs, buff, pad_len,
+ 0x20 /* space */);
+ buff += pad_len;
+ }
+#endif /* REMOVED */
+ }
+ }
+
+ ut_a(buff <= buff_start + buff_len);
+
+ DBUG_RETURN((uint)(buff - buff_start));
+}
+#endif /* WITH_WSREP */
+
/*******************************************************************//**
Stores a key value for a row to a buffer.
@return key value length as stored in buff */
@@ -7185,6 +7658,9 @@ ha_innobase::write_row(
ibool auto_inc_used= FALSE;
ulint sql_command;
trx_t* trx = thd_to_trx(user_thd);
+#ifdef WITH_WSREP
+ ibool auto_inc_inserted= FALSE; /* if NULL was inserted */
+#endif
DBUG_ENTER("ha_innobase::write_row");
@@ -7220,8 +7696,19 @@ ha_innobase::write_row(
if ((sql_command == SQLCOM_ALTER_TABLE
|| sql_command == SQLCOM_OPTIMIZE
|| sql_command == SQLCOM_CREATE_INDEX
+#ifdef WITH_WSREP
+ || (wsrep_on(user_thd) && wsrep_load_data_splitting &&
+ sql_command == SQLCOM_LOAD)
+#endif /* WITH_WSREP */
|| sql_command == SQLCOM_DROP_INDEX)
&& num_write_row >= 10000) {
+#ifdef WITH_WSREP
+ if (wsrep_on(user_thd) && sql_command == SQLCOM_LOAD) {
+ WSREP_DEBUG("forced trx split for LOAD: %s",
+ wsrep_thd_query(user_thd));
+ }
+#endif /* WITH_WSREP */
+
/* ALTER TABLE is COMMITted at every 10000 copied rows.
The IX table lock for the original table has to be re-issued.
As this method will be called on a temporary table where the
@@ -7255,6 +7742,20 @@ no_commit:
*/
;
} else if (src_table == prebuilt->table) {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_ROLLBACK:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Source table is not in InnoDB format:
no need to re-acquire locks on it. */
@@ -7265,6 +7766,19 @@ no_commit:
/* We will need an IX lock on the destination table. */
prebuilt->sql_stat_start = TRUE;
} else {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_ROLLBACK:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Ensure that there are no other table locks than
LOCK_IX and LOCK_AUTO_INC on the destination table. */
@@ -7294,6 +7808,10 @@ no_commit:
innobase_get_auto_increment(). */
prebuilt->autoinc_error = DB_SUCCESS;
+#ifdef WITH_WSREP
+ auto_inc_inserted= (table->next_number_field->val_int() == 0);
+#endif
+
if ((error_result = update_auto_increment())) {
/* We don't want to mask autoinc overflow errors. */
@@ -7381,6 +7899,32 @@ no_commit:
case SQLCOM_REPLACE_SELECT:
goto set_max_autoinc;
+#ifdef WITH_WSREP
+ /* workaround for LP bug #355000, retrying the insert */
+ case SQLCOM_INSERT:
+ if (wsrep_on(current_thd) &&
+ auto_inc_inserted &&
+ wsrep_drupal_282555_workaround &&
+ !thd_test_options(current_thd,
+ OPTION_NOT_AUTOCOMMIT |
+ OPTION_BEGIN)) {
+ WSREP_DEBUG(
+ "retrying insert: %s",
+ (*wsrep_thd_query(current_thd)) ?
+ wsrep_thd_query(current_thd) :
+ (char *)"void");
+ error= DB_SUCCESS;
+ wsrep_thd_set_conflict_state(
+ current_thd, MUST_ABORT);
+ innobase_srv_conc_exit_innodb(
+ prebuilt->trx);
+ /* jump straight to func exit over
+ * later wsrep hooks */
+ goto func_exit;
+ }
+ break;
+#endif /* WITH_WSREP */
+
default:
break;
}
@@ -7440,6 +7984,21 @@ report_error:
prebuilt->table->flags,
user_thd);
+#ifdef WITH_WSREP
+ if (!error_result && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd) && !wsrep_consistency_check(user_thd) &&
+ (sql_command != SQLCOM_LOAD ||
+ thd_binlog_format(user_thd) == BINLOG_FORMAT_ROW)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ error_result = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif /* WITH_WSREP */
+
if (error_result == HA_FTS_INVALID_DOCID) {
my_error(HA_FTS_INVALID_DOCID, MYF(0));
}
@@ -7454,6 +8013,83 @@ func_exit:
DBUG_RETURN(error_result);
}
+static
+int
+wsrep_calc_row_hash(
+/*================*/
+ byte* digest, /*!< in/out: md5 sum */
+ const uchar* row, /*!< in: row in MySQL format */
+ TABLE* table, /*!< in: table in MySQL data
+ dictionary */
+ row_prebuilt_t* prebuilt, /*!< in: InnoDB prebuilt struct */
+ THD* thd) /*!< in: user thread */
+{
+ Field* field;
+ enum_field_types field_mysql_type;
+ uint n_fields;
+ ulint len;
+ const byte* ptr;
+ ulint col_type;
+ uint i;
+
+ void *ctx = wsrep_md5_init();
+
+ n_fields = table->s->fields;
+
+ for (i = 0; i < n_fields; i++) {
+ byte null_byte=0;
+ byte true_byte=1;
+
+ field = table->field[i];
+
+ ptr = (const byte*) row + get_field_offset(table, field);
+ len = field->pack_length();
+
+ field_mysql_type = field->type();
+
+ col_type = prebuilt->table->cols[i].mtype;
+
+ switch (col_type) {
+
+ case DATA_BLOB:
+ ptr = row_mysql_read_blob_ref(&len, ptr, len);
+
+ break;
+
+ case DATA_VARCHAR:
+ case DATA_BINARY:
+ case DATA_VARMYSQL:
+ if (field_mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* This is a >= 5.0.3 type true VARCHAR where
+ the real payload data length is stored in
+ 1 or 2 bytes */
+
+ ptr = row_mysql_read_true_varchar(
+ &len, ptr,
+ (ulint)
+ (((Field_varstring*)field)->length_bytes));
+
+ }
+
+ break;
+ default:
+ ;
+ }
+
+ if (field->is_null_in_record(row)) {
+ wsrep_md5_update(ctx, (char*)&null_byte, 1);
+ } else {
+ wsrep_md5_update(ctx, (char*)&true_byte, 1);
+ wsrep_md5_update(ctx, (char*)ptr, len);
+ }
+ }
+
+ wsrep_compute_md5_hash((char*)digest, ctx);
+
+ return(0);
+}
+#endif /* WITH_WSREP */
+
/**********************************************************************//**
Checks which fields have changed in a row and stores information
of them to an update vector.
@@ -7875,6 +8511,21 @@ func_exit:
innobase_active_small();
+#ifdef WITH_WSREP
+ if (!err && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ DBUG_PRINT("wsrep", ("update row key"));
+
+ if (wsrep_append_keys(user_thd, false, old_row, new_row)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ err = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif /* WITH_WSREP */
+
if (UNIV_UNLIKELY(share->ib_table->is_corrupt)) {
DBUG_RETURN(HA_ERR_CRASHED);
}
@@ -7937,6 +8588,19 @@ ha_innobase::delete_row(
innobase_active_small();
+#ifdef WITH_WSREP
+ if (error == DB_SUCCESS && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("delete fail"));
+ error = DB_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif /* WITH_WSREP */
+
if (UNIV_UNLIKELY(share && share->ib_table
&& share->ib_table->is_corrupt)) {
DBUG_RETURN(HA_ERR_CRASHED);
@@ -9120,6 +9784,374 @@ ha_innobase::ft_end()
rnd_end();
}
+extern
+dberr_t
+wsrep_append_foreign_key(
+/*===========================*/
+ trx_t* trx, /*!< in: trx */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint */
+ const rec_t* rec, /*!<in: clustered index record */
+ dict_index_t* index, /*!<in: clustered index */
+ ibool referenced, /*!<in: is check for referenced table */
+ ibool shared) /*!<in: is shared access */
+{
+ THD* thd = (THD*)trx->mysql_thd;
+ int rcode = 0;
+ char cache_key[513] = {'\0'};
+ int cache_key_len;
+ bool const copy = true;
+ ut_a(trx);
+
+ if (!wsrep_on(trx->mysql_thd) ||
+ wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ return DB_SUCCESS;
+
+ if (!thd || !foreign ||
+ (!foreign->referenced_table && !foreign->foreign_table))
+ {
+ WSREP_INFO("FK: %s missing in: %s",
+ (!thd) ? "thread" :
+ ((!foreign) ? "constraint" :
+ ((!foreign->referenced_table) ?
+ "referenced table" : "foreign table")),
+ (thd && wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_DEBUG("pulling %s table into cache",
+ (referenced) ? "referenced" : "foreign");
+ mutex_enter(&(dict_sys->mutex));
+ if (referenced)
+ {
+ foreign->referenced_table =
+ dict_table_get_low(
+ foreign->referenced_table_name_lookup);
+ if (foreign->referenced_table)
+ {
+ foreign->referenced_index =
+ wsrep_dict_foreign_find_index(
+ foreign->referenced_table, NULL,
+ foreign->referenced_col_names,
+ foreign->n_fields,
+ foreign->foreign_index,
+ TRUE, FALSE);
+ }
+ }
+ else
+ {
+ foreign->foreign_table =
+ dict_table_get_low(
+ foreign->foreign_table_name_lookup);
+ if (foreign->foreign_table)
+ {
+ foreign->foreign_index =
+ wsrep_dict_foreign_find_index(
+ foreign->foreign_table, NULL,
+ foreign->foreign_col_names,
+ foreign->n_fields,
+ foreign->referenced_index,
+ TRUE, FALSE);
+ }
+ }
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_WARN("FK: %s missing in query: %s",
+ (!foreign->referenced_table) ?
+ "referenced table" : "foreign table",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1];
+ ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH;
+
+ dict_index_t *idx_target = (referenced) ?
+ foreign->referenced_index : index;
+ dict_index_t *idx = (referenced) ?
+ UT_LIST_GET_FIRST(foreign->referenced_table->indexes) :
+ UT_LIST_GET_FIRST(foreign->foreign_table->indexes);
+ int i = 0;
+ while (idx != NULL && idx != idx_target) {
+ if (innobase_strcasecmp (idx->name, innobase_index_reserve_name) != 0) {
+ i++;
+ }
+ idx = UT_LIST_GET_NEXT(indexes, idx);
+ }
+ ut_a(idx);
+ key[0] = (char)i;
+
+ rcode = wsrep_rec_get_foreign_key(
+ &key[1], &len, rec, index, idx,
+ wsrep_protocol_version > 1);
+ if (rcode != DB_SUCCESS) {
+ WSREP_ERROR(
+ "FK key set failed: %d (%lu %lu), index: %s %s, %s",
+ rcode, referenced, shared,
+ (index && index->name) ? index->name :
+ "void index",
+ (index && index->table_name) ? index->table_name :
+ "void table",
+ wsrep_thd_query(thd));
+ return DB_ERROR;
+ }
+ strncpy(cache_key,
+ (wsrep_protocol_version > 1) ?
+ ((referenced) ?
+ foreign->referenced_table->name :
+ foreign->foreign_table->name) :
+ foreign->foreign_table->name, sizeof(cache_key) - 1);
+ cache_key_len = strlen(cache_key);
+// #define WSREP_DEBUG_PRINT
+#ifdef WSREP_DEBUG_PRINT
+ ulint j;
+ fprintf(stderr, "FK parent key, table: %s %s len: %lu ",
+ cache_key, (shared) ? "shared" : "exclusive", len+1);
+ for (j=0; j<len+1; j++) {
+ fprintf(stderr, " %hhX, ", key[j]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ char *p = strchr(cache_key, '/');
+ if (p) {
+ *p = '\0';
+ } else {
+ WSREP_WARN("unexpected foreign key table %s %s",
+ foreign->referenced_table->name,
+ foreign->foreign_table->name);
+ }
+
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)cache_key,
+ cache_key_len + 1,
+ (const uchar*)key, len+1,
+ wkey_part,
+ &wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for cascaded FK: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ rcode = wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
+ WSREP_ERROR("Appending cascaded fk row key failed: %s, %d",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ return DB_ERROR;
+ }
+
+ return DB_SUCCESS;
+}
+
+#ifdef WITH_WSREP
+static int
+wsrep_append_key(
+/*==================*/
+ THD *thd,
+ trx_t *trx,
+ TABLE_SHARE *table_share,
+ TABLE *table,
+ const char* key,
+ uint16_t key_len,
+ bool shared
+)
+{
+ DBUG_ENTER("wsrep_append_key");
+ bool const copy = true;
+#ifdef WSREP_DEBUG_PRINT
+ fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ",
+ (shared) ? "Shared" : "Exclusive",
+ wsrep_thd_thread_id(thd), (long long)trx->id, key_len,
+ table_share->table_name.str);
+ for (int i=0; i<key_len; i++) {
+ fprintf(stderr, "%hhX, ", key[i]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)table_share->table_cache_key.str,
+ table_share->table_cache_key.length,
+ (const uchar*)key, key_len,
+ wkey_part,
+ &wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(-1);
+ }
+
+ int rcode = wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
+ WSREP_WARN("Appending row key failed: %s, %d",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ DBUG_RETURN(-1);
+ }
+ DBUG_RETURN(0);
+}
+
+extern void compute_md5_hash(char *digest, const char *buf, int len);
+#define MD5_HASH compute_md5_hash
+
+int
+ha_innobase::wsrep_append_keys(
+/*==================*/
+ THD *thd,
+ bool shared,
+ const uchar* record0, /* in: row in MySQL format */
+ const uchar* record1) /* in: row in MySQL format */
+{
+ int rcode;
+ DBUG_ENTER("wsrep_append_keys");
+
+ bool key_appended = false;
+ trx_t *trx = thd_to_trx(thd);
+
+ if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
+ WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
+ wsrep_thd_thread_id(thd),
+ table_share->tmp_table,
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(0);
+ }
+
+ if (wsrep_protocol_version == 0) {
+ uint len;
+ char keyval[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char *key = &keyval[0];
+ KEY *key_info = table->key_info;
+ ibool is_null;
+
+ len = wsrep_store_key_val_for_row(
+ table, 0, key, key_info->key_length, record0, &is_null);
+
+ if (!is_null) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share, table, keyval,
+ len, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped (proto 0): %s",
+ wsrep_thd_query(thd));
+ }
+ } else {
+ ut_a(table->s->keys <= 256);
+ uint i;
+ for (i=0; i<table->s->keys; ++i) {
+ uint len;
+ char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char* key0 = &keyval0[1];
+ char* key1 = &keyval1[1];
+ KEY* key_info = table->key_info + i;
+ ibool is_null;
+
+ dict_index_t* idx = innobase_get_index(i);
+ dict_table_t* tab = (idx) ? idx->table : NULL;
+
+ keyval0[0] = (char)i;
+ keyval1[0] = (char)i;
+
+ if (!tab) {
+ WSREP_WARN("MySQL-InnoDB key mismatch %s %s",
+ table->s->table_name.str,
+ key_info->name);
+ }
+ if (key_info->flags & HA_NOSAME ||
+ ((tab &&
+ dict_table_get_referenced_constraint(tab, idx)) ||
+ (!tab && referenced_by_foreign_key()))) {
+
+ if (key_info->flags & HA_NOSAME || shared)
+ key_appended = true;
+
+ len = wsrep_store_key_val_for_row(
+ table, i, key0, key_info->key_length,
+ record0, &is_null);
+ if (!is_null) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share, table,
+ keyval0, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped: %s",
+ wsrep_thd_query(thd));
+ }
+ if (record1) {
+ len = wsrep_store_key_val_for_row(
+ table, i, key1, key_info->key_length,
+ record1, &is_null);
+ if (!is_null && memcmp(key0, key1, len)) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share,
+ table,
+ keyval1, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ }
+ }
+ }
+ }
+
+ /* if no PK, calculate hash of full row, to be the key value */
+ if (!key_appended && wsrep_certify_nonPK) {
+ uchar digest[16];
+ int rcode;
+
+ wsrep_calc_row_hash(digest, record0, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share, table,
+ (const char*) digest, 16,
+ shared))) {
+ DBUG_RETURN(rcode);
+ }
+
+ if (record1) {
+ wsrep_calc_row_hash(
+ digest, record1, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share,
+ table,
+ (const char*) digest,
+ 16, shared))) {
+ DBUG_RETURN(rcode);
+ }
+ }
+ DBUG_RETURN(0);
+ }
+
+ DBUG_RETURN(0);
+}
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Stores a reference to the current row to 'ref' field of the handle. Note
that in the case where we have generated the clustered index for the
@@ -12893,11 +13925,18 @@ ha_innobase::external_lock(
/* used by test case */
DBUG_EXECUTE_IF("no_innodb_binlog_errors", skip = true;);
if (!skip) {
- my_error(ER_BINLOG_STMT_MODE_AND_ROW_ENGINE, MYF(0),
- " InnoDB is limited to row-logging when "
- "transaction isolation level is "
- "READ COMMITTED or READ UNCOMMITTED.");
- DBUG_RETURN(HA_ERR_LOGGING_IMPOSSIBLE);
+#ifdef WITH_WSREP
+ if (!wsrep_on(thd)
+ || wsrep_thd_exec_mode(thd) == LOCAL_STATE) {
+#endif /* WITH_WSREP */
+ my_error(ER_BINLOG_STMT_MODE_AND_ROW_ENGINE, MYF(0),
+ " InnoDB is limited to row-logging when "
+ "transaction isolation level is "
+ "READ COMMITTED or READ UNCOMMITTED.");
+ DBUG_RETURN(HA_ERR_LOGGING_IMPOSSIBLE);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
}
@@ -14352,6 +15391,9 @@ innobase_xa_prepare(
time, not the current session variable value. Any possible changes
to the session variable take effect only in the next transaction */
if (!trx->support_xa) {
+#ifdef WITH_WSREP
+ thd_get_xid(thd, (MYSQL_XID*) &trx->xid);
+#endif // WITH_WSREP
return(0);
}
@@ -16704,6 +17746,297 @@ static SHOW_VAR innodb_status_variables_export[]= {
static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+#ifdef WITH_WSREP
+void
+wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ (long long)bf_seqno, (long long)victim_seqno);
+ abort();
+}
+int
+wsrep_innobase_kill_one_trx(void * const bf_thd_ptr,
+ const trx_t * const bf_trx,
+ trx_t *victim_trx, ibool signal)
+{
+ ut_ad(lock_mutex_own());
+ ut_ad(trx_mutex_own(victim_trx));
+ ut_ad(bf_thd_ptr);
+ ut_ad(victim_trx);
+
+ DBUG_ENTER("wsrep_innobase_kill_one_trx");
+ THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
+ THD *thd = (THD *) victim_trx->mysql_thd;
+ int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
+
+ if (!thd) {
+ DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
+ WSREP_WARN("no THD for trx: %lu", victim_trx->id);
+ DBUG_RETURN(1);
+ }
+ if (!bf_thd) {
+ DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
+ WSREP_WARN("no BF THD for trx: %lu", (bf_trx) ? bf_trx->id : 0);
+ DBUG_RETURN(1);
+ }
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: %lu",
+ signal, (long long)bf_seqno,
+ wsrep_thd_thread_id(thd),
+ victim_trx->id);
+
+ WSREP_DEBUG("Aborting query: %s",
+ (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void");
+
+ wsrep_thd_LOCK(thd);
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
+ WSREP_DEBUG("kill trx EXITING for %lu", victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ }
+ if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
+ WSREP_DEBUG("withdraw for BF trx: %lu, state: %d",
+ victim_trx->id,
+ wsrep_thd_conflict_state(thd));
+ }
+
+ switch (wsrep_thd_conflict_state(thd)) {
+ case NO_CONFLICT:
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("victim %lu in MUST ABORT state",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ DBUG_RETURN(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("victim %lu in state %d",
+ victim_trx->id, wsrep_thd_conflict_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd)) {
+ case QUERY_COMMITTING:
+ enum wsrep_status rcode;
+
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ WSREP_DEBUG("kill trx QUERY_COMMITTING for %lu",
+ victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ } else {
+ rcode = wsrep->abort_pre_commit(
+ wsrep, bf_seqno,
+ (wsrep_trx_id_t)victim_trx->id
+ );
+
+ switch (rcode) {
+ case WSREP_WARNING:
+ WSREP_DEBUG("cancel commit warning: %lu",
+ victim_trx->id);
+ DBUG_RETURN(1);
+ break;
+ case WSREP_OK:
+ break;
+ default:
+ WSREP_ERROR(
+ "cancel commit bad exit: %d %lu",
+ rcode,
+ victim_trx->id);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first.
+ */
+ abort();
+ break;
+ }
+ }
+ break;
+ case QUERY_EXEC:
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting
+ */
+ WSREP_DEBUG("kill trx QUERY_EXEC for %lu", victim_trx->id);
+
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ if (victim_trx->lock.wait_lock) {
+ WSREP_DEBUG("victim has wait flag: %ld",
+ wsrep_thd_thread_id(thd));
+ lock_t* wait_lock = victim_trx->lock.wait_lock;
+ if (wait_lock) {
+ WSREP_DEBUG("canceling wait lock");
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ lock_cancel_waiting_and_release(wait_lock);
+ }
+
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ } else {
+ /* abort currently executing query */
+ DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_UNLOCK(thd);
+ /* Note that innobase_kill_connection will take lock_mutex
+ and trx_mutex */
+ wsrep_thd_awake(thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ }
+ }
+ break;
+ case QUERY_IDLE:
+ {
+ bool skip_abort= false;
+ wsrep_aborting_thd_t abortees;
+
+ WSREP_DEBUG("kill IDLE for %lu", victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ WSREP_DEBUG("kill BF IDLE, seqno: %lld",
+ (long long)wsrep_thd_trx_seqno(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ DBUG_RETURN(0);
+ }
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+
+ abortees = wsrep_aborting_thd;
+ while (abortees && !skip_abort) {
+ /* check if we have a kill message for this already */
+ if (abortees->aborting_thd == thd) {
+ skip_abort = true;
+ WSREP_WARN("duplicate thd aborter %lu",
+ wsrep_thd_thread_id(thd));
+ }
+ abortees = abortees->next;
+ }
+ if (!skip_abort) {
+ wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
+ my_malloc(sizeof(struct wsrep_aborting_thd),
+ MYF(0));
+ aborting->aborting_thd = thd;
+ aborting->next = wsrep_aborting_thd;
+ wsrep_aborting_thd = aborting;
+ DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("enqueuing trx abort for (%lu)",
+ wsrep_thd_thread_id(thd));
+ }
+
+ DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
+ WSREP_DEBUG("signaling aborter");
+ mysql_cond_signal(&COND_wsrep_rollback);
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+ wsrep_thd_UNLOCK(thd);
+
+ break;
+ }
+ default:
+ WSREP_WARN("bad wsrep query state: %d",
+ wsrep_thd_query_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ break;
+ }
+
+ DBUG_RETURN(0);
+}
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal)
+{
+ DBUG_ENTER("wsrep_innobase_abort_thd");
+ trx_t* victim_trx = thd_to_trx(victim_thd);
+ trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
+ WSREP_DEBUG("abort transaction: BF: %s victim: %s",
+ wsrep_thd_query(bf_thd),
+ wsrep_thd_query(victim_thd));
+
+ if (victim_trx)
+ {
+ lock_mutex_enter();
+ trx_mutex_enter(victim_trx);
+ int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
+ victim_trx, signal);
+ trx_mutex_exit(victim_trx);
+ lock_mutex_exit();
+ wsrep_srv_conc_cancel_wait(victim_trx);
+
+ DBUG_RETURN(rcode);
+ } else {
+ WSREP_DEBUG("victim does not have transaction");
+ wsrep_thd_LOCK(victim_thd);
+ wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
+ wsrep_thd_UNLOCK(victim_thd);
+ wsrep_thd_awake(victim_thd, signal);
+ }
+ DBUG_RETURN(-1);
+}
+
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ if (wsrep_is_wsrep_xid((const void *)xid)) {
+ mtr_t mtr;
+ mtr_start(&mtr);
+ trx_sysf_t* sys_header = trx_sysf_get(&mtr);
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ trx_sys_read_wsrep_checkpoint(xid);
+ return 0;
+}
+
+static void
+wsrep_fake_trx_id(
+/*==================*/
+ handlerton *hton,
+ THD *thd) /*!< in: user thread handle */
+{
+ mutex_enter(&trx_sys->mutex);
+ trx_id_t trx_id = trx_sys_get_new_trx_id();
+ mutex_exit(&trx_sys->mutex);
+
+ (void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
+}
+
+#endif /* WITH_WSREP */
+
/* plugin options */
static MYSQL_SYSVAR_ENUM(checksum_algorithm, srv_checksum_algorithm,
diff --git a/storage/xtradb/handler/ha_innodb.h b/storage/xtradb/handler/ha_innodb.h
index 773a9b6b04d..9c4a715642a 100644
--- a/storage/xtradb/handler/ha_innodb.h
+++ b/storage/xtradb/handler/ha_innodb.h
@@ -103,6 +103,10 @@ class ha_innobase: public handler
void innobase_initialize_autoinc();
dict_index_t* innobase_get_index(uint keynr);
+#ifdef WITH_WSREP
+ int wsrep_append_keys(THD *thd, bool shared,
+ const uchar* record0, const uchar* record1);
+#endif
/* Init values for the class: */
public:
ha_innobase(handlerton *hton, TABLE_SHARE *table_arg);
@@ -653,3 +657,35 @@ innobase_copy_frm_flags_from_table_share(
/*=====================================*/
dict_table_t* innodb_table, /*!< in/out: InnoDB table */
const TABLE_SHARE* table_share); /*!< in: table share */
+
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+//extern "C" int wsrep_trx_order_before(void *thd1, void *thd2);
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd);
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd);
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd);
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state);
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state);
+
+extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id);
+
+extern "C"void wsrep_thd_LOCK(THD *thd);
+extern "C"void wsrep_thd_UNLOCK(THD *thd);
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
+extern "C" time_t wsrep_thd_query_start(THD *thd);
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
+extern "C" int64_t wsrep_thd_trx_seqno(THD *thd);
+extern "C" query_id_t wsrep_thd_query_id(THD *thd);
+extern "C" char * wsrep_thd_query(THD *thd);
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
+extern "C" void wsrep_thd_awake(THD *thd, my_bool signal);
+#endif
diff --git a/storage/xtradb/handler/handler0alter.cc b/storage/xtradb/handler/handler0alter.cc
index 9c535285d1e..d0f332513b4 100644
--- a/storage/xtradb/handler/handler0alter.cc
+++ b/storage/xtradb/handler/handler0alter.cc
@@ -47,6 +47,10 @@ Smart ALTER TABLE
#include "fts0priv.h"
#include "pars0pars.h"
+#ifdef WITH_WSREP
+//#include "wsrep_api.h"
+#include <sql_acl.h> // PROCESS_ACL
+#endif
#include "ha_innodb.h"
/** Operations for creating secondary indexes (no rebuild needed) */
diff --git a/storage/xtradb/include/dict0mem.h b/storage/xtradb/include/dict0mem.h
index bde0ce16094..81307d3aae3 100644
--- a/storage/xtradb/include/dict0mem.h
+++ b/storage/xtradb/include/dict0mem.h
@@ -470,6 +470,9 @@ be REC_VERSION_56_MAX_INDEX_COL_LEN (3072) bytes */
/** Defines the maximum fixed length column size */
#define DICT_MAX_FIXED_COL_LEN DICT_ANTELOPE_MAX_INDEX_COL_LEN
+#ifdef WITH_WSREP
+#define WSREP_MAX_SUPPORTED_KEY_LENGTH 3500
+#endif /* WITH_WSREP */
/** Data structure for a field in an index */
struct dict_field_t{
diff --git a/storage/xtradb/include/ha_prototypes.h b/storage/xtradb/include/ha_prototypes.h
index 4599547439e..d1652c1f0d8 100644
--- a/storage/xtradb/include/ha_prototypes.h
+++ b/storage/xtradb/include/ha_prototypes.h
@@ -375,6 +375,21 @@ thd_flush_log_at_trx_commit(
/*================================*/
void* thd);
+#ifdef WITH_WSREP
+UNIV_INTERN
+int
+wsrep_innobase_kill_one_trx(void *thd_ptr,
+ const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
+
+extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
+int wsrep_trx_order_before(void *thd1, void *thd2);
+void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
+ unsigned char* str, unsigned int str_length);
+//UNIV_INTERN
+int
+wsrep_on(void *thd_ptr);
+extern "C" int wsrep_is_wsrep_xid(const void*);
+#endif /* WITH_WSREP */
/**********************************************************************//**
Get the current setting of the lower_case_table_names global parameter from
mysqld.cc. We do a dirty read because for one there is no synchronization
diff --git a/storage/xtradb/include/lock0lock.h b/storage/xtradb/include/lock0lock.h
index 3a3a28ef525..d11f3b006d4 100644
--- a/storage/xtradb/include/lock0lock.h
+++ b/storage/xtradb/include/lock0lock.h
@@ -906,9 +906,7 @@ lock_trx_has_rec_x_lock(
record */
#define LOCK_CONV_BY_OTHER 4096 /*!< this bit is set when the lock is created
by other transaction */
-#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_MODE_MASK
-# error
-#endif
+#define WSREP_BF 8192
#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_TYPE_MASK
# error
#endif
diff --git a/storage/xtradb/include/rem0rec.h b/storage/xtradb/include/rem0rec.h
index 2a84aee7a6f..fc60965e089 100644
--- a/storage/xtradb/include/rem0rec.h
+++ b/storage/xtradb/include/rem0rec.h
@@ -970,6 +970,15 @@ are given in one byte (resp. two byte) format. */
two upmost bits in a two byte offset for special purposes */
#define REC_MAX_DATA_SIZE (16 * 1024)
+#ifdef WITH_WSREP
+int wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index for foreign table */
+ dict_index_t* index_ref, /* in: index for referenced table */
+ ibool new_protocol); /* in: protocol > 1 */
+#endif /* WITH_WSREP */
#ifndef UNIV_NONINL
#include "rem0rec.ic"
#endif
diff --git a/storage/xtradb/include/srv0srv.h b/storage/xtradb/include/srv0srv.h
index d278782daa8..0d0aaa5f906 100644
--- a/storage/xtradb/include/srv0srv.h
+++ b/storage/xtradb/include/srv0srv.h
@@ -1098,4 +1098,13 @@ struct srv_slot_t{
# define srv_file_per_table 1
#endif /* !UNIV_HOTBACKUP */
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx); /*!< in: transaction object associated with the
+ thread */
+#endif /* WITH_WSREP */
+
#endif
diff --git a/storage/xtradb/include/trx0sys.h b/storage/xtradb/include/trx0sys.h
index 7b97c6e99cd..48da2bfcc3c 100644
--- a/storage/xtradb/include/trx0sys.h
+++ b/storage/xtradb/include/trx0sys.h
@@ -41,6 +41,9 @@ Created 3/26/1996 Heikki Tuuri
#include "ut0bh.h"
#include "read0types.h"
#include "page0types.h"
+#ifdef WITH_WSREP
+#include "trx0xa.h"
+#endif /* WITH_WSREP */
#include "ut0bh.h"
typedef UT_LIST_BASE_NODE_T(trx_t) trx_list_t;
@@ -314,6 +317,9 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr); /*!< in: mtr */
/*****************************************************************//**
Prints to stderr the MySQL binlog offset info in the trx system header if
@@ -479,6 +485,19 @@ trx_sys_validate_trx_list(void);
/*===========================*/
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+/** Update WSREP checkpoint XID in sys header. */
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: WSREP XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr); /*!< in: mtr */
+void
+/** Read WSREP checkpoint XID from sys header. */
+trx_sys_read_wsrep_checkpoint(
+ XID* xid); /*!< out: WSREP XID */
+#endif /* WITH_WSREP */
+
/* The automatically created system rollback segment has this id */
#define TRX_SYS_SYSTEM_RSEG_ID 0
@@ -550,6 +569,22 @@ this contains the same fields as TRX_SYS_MYSQL_LOG_INFO below */
within that file */
#define TRX_SYS_MYSQL_LOG_NAME 12 /*!< MySQL log file name */
+#ifdef WITH_WSREP
+/* We hijack TRX_SYS_MYSQL_MASTER_LOG_INFO, it seems to be completely unused
+ otherwise (see comments for MySQL bug #34058). */
+/** */
+#define TRX_SYS_WSREP_XID_INFO TRX_SYS_MYSQL_MASTER_LOG_INFO
+#define TRX_SYS_WSREP_XID_MAGIC_N_FLD 0
+#define TRX_SYS_WSREP_XID_MAGIC_N 0x77737265
+
+/* XID field: formatID, gtrid_len, bqual_len, xid_data */
+#define TRX_SYS_WSREP_XID_LEN (4 + 4 + 4 + XIDDATASIZE)
+#define TRX_SYS_WSREP_XID_FORMAT 4
+#define TRX_SYS_WSREP_XID_GTRID_LEN 8
+#define TRX_SYS_WSREP_XID_BQUAL_LEN 12
+#define TRX_SYS_WSREP_XID_DATA 16
+#endif /* WITH_WSREP*/
+
/** Doublewrite buffer */
/* @{ */
/** The offset of the doublewrite buffer header on the trx system header page */
diff --git a/storage/xtradb/include/trx0trx.h b/storage/xtradb/include/trx0trx.h
index 82e9a90fcfb..18a178d5e94 100644
--- a/storage/xtradb/include/trx0trx.h
+++ b/storage/xtradb/include/trx0trx.h
@@ -1028,6 +1028,9 @@ struct trx_t{
/*------------------------------*/
char detailed_error[256]; /*!< detailed error message for last
error, or empty. */
+#ifdef WITH_WSREP
+ os_event_t wsrep_event; /* event waited for in srv_conc_slot */
+#endif /* WITH_WSREP */
/*------------------------------*/
ulint io_reads;
ib_uint64_t io_read;
diff --git a/storage/xtradb/lock/lock0lock.cc b/storage/xtradb/lock/lock0lock.cc
index 3e60680882a..559b0398147 100644
--- a/storage/xtradb/lock/lock0lock.cc
+++ b/storage/xtradb/lock/lock0lock.cc
@@ -50,6 +50,12 @@ Created 5/7/1996 Heikki Tuuri
#include "dict0boot.h"
#include <set>
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+extern my_bool wsrep_log_conflicts;
+#include "ha_prototypes.h"
+#endif
+
/* Restricts the length of search we will do in the waits-for
graph of transactions */
#define LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK 1000000
@@ -973,6 +979,15 @@ lock_rec_has_to_wait(
LOCK_MODE_MASK & type_mode),
lock_get_mode(lock2))) {
+#ifdef WITH_WSREP
+ if ((type_mode & WSREP_BF) && (lock2->type_mode & WSREP_BF)) {
+ if (wsrep_debug)
+ fprintf(stderr, "BF locks in conflict: " TRX_ID_FMT " " TRX_ID_FMT "\n",
+ trx->id, lock2->trx->id);
+ // not in return FALSE;
+ }
+#endif /* WITH_WSREP */
+
/* We have somewhat complex rules when gap type record locks
cause waits */
@@ -1520,6 +1535,12 @@ lock_rec_has_expl(
return(NULL);
}
+#ifdef WITH_WSREP
+static
+void
+lock_rec_discard(lock_t* in_lock);
+#endif /* WITH_WSREP */
+
#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
@@ -1568,6 +1589,57 @@ lock_rec_other_has_expl_req(
}
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+static void
+wsrep_kill_victim(trx_t *trx, lock_t *lock) {
+ int bf_this = wsrep_thd_is_brute_force(trx->mysql_thd);
+ int bf_other =
+ wsrep_thd_is_brute_force(lock->trx->mysql_thd);
+ if ((bf_this && !bf_other) ||
+ (bf_this && bf_other && wsrep_trx_order_before(
+ trx->mysql_thd, lock->trx->mysql_thd))) {
+
+ if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: BF victim waiting\n");
+ /* cannot release lock, until our lock
+ is in the queue*/
+ } else if (lock->trx != trx) {
+ if (wsrep_log_conflicts) {
+ mutex_enter(&trx_sys->mutex);
+ if (bf_this)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print_latched(stderr, trx, 3000);
+
+ if (bf_other)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print_latched(stderr, lock->trx, 3000);
+
+ mutex_exit(&trx_sys->mutex);
+ fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
+ stderr);
+
+ if (lock_get_type(lock) == LOCK_REC) {
+ lock_rec_print(stderr, lock);
+ } else {
+ lock_table_print(stderr, lock);
+ }
+ }
+ wsrep_innobase_kill_one_trx(trx->mysql_thd,
+ (const trx_t*)trx, lock->trx, TRUE);
+ }
+ }
+}
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
@@ -1597,6 +1669,11 @@ lock_rec_other_has_conflicting(
lock = lock_rec_get_next_const(heap_no, lock)) {
if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) {
+#ifdef WITH_WSREP
+ trx_mutex_enter(lock->trx);
+ wsrep_kill_victim((trx_t*)trx, (ib_lock_t*)lock);
+ trx_mutex_exit(lock->trx);
+#endif
return(lock);
}
}
@@ -1734,6 +1811,10 @@ static
lock_t*
lock_rec_create(
/*============*/
+#ifdef WITH_WSREP
+ lock_t* const c_lock, /* conflicting lock */
+ que_thr_t* thr,
+#endif
ulint type_mode,/*!< in: lock mode and wait
flag, type is ignored and
replaced by LOCK_REC */
@@ -1787,8 +1868,13 @@ lock_rec_create(
lock->trx = trx;
lock->type_mode = (type_mode & ~LOCK_TYPE_MASK) | LOCK_REC;
- lock->index = index;
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_brute_force(trx->mysql_thd)) {
+ lock->type_mode |= WSREP_BF;
+ }
+#endif /* WITH_WSREP */
+ lock->index = index;
lock->un_member.rec_lock.space = space;
lock->un_member.rec_lock.page_no = page_no;
lock->un_member.rec_lock.n_bits = n_bytes * 8;
@@ -1805,8 +1891,84 @@ lock_rec_create(
ut_ad(index->table->n_ref_count > 0 || !index->table->can_be_evicted);
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_brute_force(trx->mysql_thd)) {
+ lock_t *hash = (lock_t *)c_lock->hash;
+ lock_t *prev = NULL;
+
+ while (hash &&
+ wsrep_thd_is_brute_force(
+ ((lock_t *)hash)->trx->mysql_thd) &&
+ wsrep_trx_order_before(
+ ((lock_t *)hash)->trx->mysql_thd,
+ trx->mysql_thd)) {
+ prev = hash;
+ hash = (lock_t *)hash->hash;
+ }
+ lock->hash = hash;
+ if (prev) {
+ prev->hash = lock;
+ } else {
+ c_lock->hash = lock;
+ }
+ /*
+ * delayed conflict resolution '...kill_one_trx' was not called,
+ * if victim was waiting for some other lock
+ */
+ trx_mutex_enter(c_lock->trx);
+ if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+
+ c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
+
+ if (wsrep_debug &&
+ c_lock->trx->lock.wait_lock != c_lock) {
+ fprintf(stderr, "WSREP: c_lock != wait lock\n");
+ lock_rec_print(stderr, c_lock);
+ lock_rec_print(stderr, c_lock->trx->lock.wait_lock);
+ }
+
+ trx->lock.que_state = TRX_QUE_LOCK_WAIT;
+ lock_set_lock_and_trx_wait(lock, trx);
+ UT_LIST_ADD_LAST(trx_locks, trx->lock.trx_locks, lock);
+
+ ut_ad(thr != NULL);
+ trx->lock.wait_thr = thr;
+ thr->state = QUE_THR_LOCK_WAIT;
+
+ /* have to release trx mutex for the duration of
+ victim lock release. This will eventually call
+ lock_grant, which wants to grant trx mutex again
+ */
+ if (caller_owns_trx_mutex) trx_mutex_exit(trx);
+ lock_cancel_waiting_and_release(
+ c_lock->trx->lock.wait_lock);
+ if (caller_owns_trx_mutex) trx_mutex_enter(trx);
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->lock.wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+ trx_mutex_exit(c_lock->trx);
+
+ if (wsrep_debug) fprintf(
+ stderr,
+ "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+
+ /* have to bail out here to avoid lock_set_lock... */
+ return(lock);
+ }
+ trx_mutex_exit(c_lock->trx);
+ } else {
+ HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
+ lock_rec_fold(space, page_no), lock);
+ }
+#else
HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock);
+#endif /* WITH_WSREP */
lock_sys->rec_num++;
@@ -1843,6 +2005,9 @@ static
dberr_t
lock_rec_enqueue_waiting(
/*=====================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint type_mode,/*!< in: lock mode this
transaction is requesting:
LOCK_S or LOCK_X, possibly
@@ -1905,9 +2070,20 @@ lock_rec_enqueue_waiting(
/* Enqueue the lock request that will wait
to be granted, note that we already own
the trx mutex. */
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->lock.was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ lock = lock_rec_create(
+ c_lock, thr,
+ type_mode | LOCK_WAIT, block, heap_no,
+ index, trx, TRUE);
+#else
lock = lock_rec_create(
type_mode | LOCK_WAIT, block, heap_no,
index, trx, TRUE);
+#endif /* WITH_WSREP */
} else {
ut_ad(lock->type_mode & LOCK_WAIT);
ut_ad(lock->type_mode & LOCK_CONV_BY_OTHER);
@@ -2019,7 +2195,19 @@ lock_rec_add_to_queue(
const lock_t* other_lock
= lock_rec_other_has_expl_req(mode, 0, LOCK_WAIT,
block, heap_no, trx);
+#ifdef WITH_WSREP
+ /* this can potentionally assert with wsrep */
+ if (wsrep_on(trx->mysql_thd)) {
+ if (wsrep_debug && other_lock) {
+ fprintf(stderr,
+ "WSREP: InnoDB assert ignored\n");
+ }
+ } else {
+ ut_a(!other_lock);
+ }
+#else
ut_a(!other_lock);
+#endif /* WITH_WSREP */
}
#endif /* UNIV_DEBUG */
@@ -2070,9 +2258,15 @@ lock_rec_add_to_queue(
}
somebody_waits:
+#ifdef WITH_WSREP
+ return(lock_rec_create(NULL, NULL,
+ type_mode, block, heap_no, index, trx,
+ caller_owns_trx_mutex));
+#else
return(lock_rec_create(
type_mode, block, heap_no, index, trx,
caller_owns_trx_mutex));
+#endif /* WITH_WSREP */
}
/** Record locking request status */
@@ -2123,6 +2317,11 @@ lock_rec_lock_fast(
|| (LOCK_MODE_MASK & mode) == LOCK_X);
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0
+#ifdef WITH_WSREP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == 0
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_GAP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_REC_NOT_GAP
+#endif /* WITH_WSREP */
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP);
ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
@@ -2135,9 +2334,13 @@ lock_rec_lock_fast(
if (lock == NULL) {
if (!impl) {
/* Note that we don't own the trx mutex. */
+#ifdef WITH_WSREP
+ lock = lock_rec_create(NULL, thr,
+ mode, block, heap_no, index, trx, FALSE);
+#else
lock = lock_rec_create(
mode, block, heap_no, index, trx, FALSE);
-
+#endif /* WITH_WSREP */
}
status = LOCK_REC_SUCCESS_CREATED;
} else {
@@ -2192,6 +2395,9 @@ lock_rec_lock_slow(
trx_t* trx;
lock_t* lock;
dberr_t err = DB_SUCCESS;
+#ifdef WITH_WSREP
+ lock_t* c_lock = NULL;
+#endif
ut_ad(lock_mutex_own());
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
@@ -2202,6 +2408,11 @@ lock_rec_lock_slow(
|| (LOCK_MODE_MASK & mode) == LOCK_X);
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0
+#ifdef WITH_WSREP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == 0
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_GAP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_REC_NOT_GAP
+#endif /* WITH_WSREP */
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP);
ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
@@ -2234,9 +2445,15 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
+#ifdef WITH_WSREP
+ } else if ((c_lock = (lock_t *)lock_rec_other_has_conflicting(
+ static_cast<enum lock_mode>(mode),
+ block, heap_no, trx))) {
+#else
} else if (lock_rec_other_has_conflicting(
static_cast<enum lock_mode>(mode),
block, heap_no, trx)) {
+#endif /* WITH_WSREP */
/* If another transaction has a non-gap conflicting
request in the queue, as this transaction does not
@@ -2245,9 +2462,16 @@ lock_rec_lock_slow(
ut_ad(lock == NULL);
enqueue_waiting:
+#ifdef WITH_WSREP
+ /* c_lock is NULL here if jump to enqueue_waiting happened
+ but it's ok because lock is not NULL in that case and c_lock
+ is not used. */
+ err = lock_rec_enqueue_waiting(c_lock,
+ mode, block, heap_no, lock, index, thr);
+#else
err = lock_rec_enqueue_waiting(
mode, block, heap_no, lock, index, thr);
-
+#endif /* WITH_WSREP */
} else if (!impl) {
/* Set the requested lock on the record, note that
we already own the transaction mutex. */
@@ -2297,6 +2521,11 @@ lock_rec_lock(
|| (LOCK_MODE_MASK & mode) == LOCK_X);
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
+#ifdef WITH_WSREP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == 0
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_GAP
+ || mode - (LOCK_MODE_MASK & mode) - WSREP_BF == LOCK_REC_NOT_GAP
+#endif /* WITH_WSREP */
|| mode - (LOCK_MODE_MASK & mode) == 0);
ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
@@ -3744,6 +3973,11 @@ lock_deadlock_select_victim(
/* The joining transaction is 'smaller',
choose it as the victim and roll it back. */
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_brute_force(ctx->start->mysql_thd))
+ return(ctx->wait_lock->trx);
+ else
+#endif /* WITH_WSREP */
return(ctx->start);
}
@@ -5930,6 +6164,9 @@ lock_rec_insert_check_and_lock(
lock_t* lock;
dberr_t err;
ulint next_rec_heap_no;
+#ifdef WITH_WSREP
+ lock_t* c_lock=NULL;
+#endif
ut_ad(block->frame == page_align(rec));
ut_ad(!dict_index_is_online_ddl(index)
@@ -5991,17 +6228,30 @@ lock_rec_insert_check_and_lock(
had to wait for their insert. Both had waiting gap type lock requests
on the successor, which produced an unnecessary deadlock. */
+#ifdef WITH_WSREP
+ if ((c_lock = (ib_lock_t*)lock_rec_other_has_conflicting(
+ static_cast<enum lock_mode>(
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION | WSREP_BF),
+ block, next_rec_heap_no, trx))) {
+#else
if (lock_rec_other_has_conflicting(
static_cast<enum lock_mode>(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION),
block, next_rec_heap_no, trx)) {
+#endif
/* Note that we may get DB_SUCCESS also here! */
trx_mutex_enter(trx);
+#ifdef WITH_WSREP
+ err = lock_rec_enqueue_waiting(c_lock,
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
+ block, next_rec_heap_no, NULL, index, thr);
+#else
err = lock_rec_enqueue_waiting(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
block, next_rec_heap_no, NULL, index, thr);
+#endif
trx_mutex_exit(trx);
} else {
diff --git a/storage/xtradb/os/os0file.cc b/storage/xtradb/os/os0file.cc
index 38eb5241da1..3f65e9f59b2 100644
--- a/storage/xtradb/os/os0file.cc
+++ b/storage/xtradb/os/os0file.cc
@@ -1795,7 +1795,9 @@ os_file_create_func(
} while (retry);
- if (srv_use_atomic_writes && type == OS_DATA_FILE &&
+ if (file != INVALID_HANDLE_VALUE
+ && srv_use_atomic_writes
+ && type == OS_DATA_FILE &&
!os_file_set_atomic_writes(name, file)) {
CloseHandle(file);
*success = FALSE;
@@ -1932,7 +1934,9 @@ os_file_create_func(
}
#endif /* USE_FILE_LOCK */
- if (srv_use_atomic_writes && type == OS_DATA_FILE
+ if (file != -1
+ && srv_use_atomic_writes
+ && type == OS_DATA_FILE
&& !os_file_set_atomic_writes(name, file)) {
*success = FALSE;
diff --git a/storage/xtradb/rem/rem0rec.cc b/storage/xtradb/rem/rem0rec.cc
index 43072159b9e..f31b16ecd9f 100644
--- a/storage/xtradb/rem/rem0rec.cc
+++ b/storage/xtradb/rem/rem0rec.cc
@@ -33,6 +33,9 @@ Created 5/30/1994 Heikki Tuuri
#include "mtr0mtr.h"
#include "mtr0log.h"
#include "fts0fts.h"
+#ifdef WITH_WSREP
+#include <ha_prototypes.h>
+#endif /* WITH_WSREP */
/* PHYSICAL RECORD (OLD STYLE)
===========================
@@ -1916,6 +1919,138 @@ rec_print(
}
}
}
+#endif /* !UNIV_HOTBACKUP */
+
+#ifdef WITH_WSREP
+int
+wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index in foreign table */
+ dict_index_t* index_ref, /* in: index in referenced table */
+ ibool new_protocol) /* in: protocol > 1 */
+{
+ const byte* data;
+ ulint len;
+ ulint key_len = 0;
+ ulint i;
+ uint key_parts;
+ mem_heap_t* heap = NULL;
+ ulint offsets_[REC_OFFS_NORMAL_SIZE];
+ const ulint* offsets;
+
+ ut_ad(index_for);
+ ut_ad(index_ref);
+
+ rec_offs_init(offsets_);
+ offsets = rec_get_offsets(rec, index_for, offsets_,
+ ULINT_UNDEFINED, &heap);
+
+ ut_ad(rec_offs_validate(rec, NULL, offsets));
+
+ ut_ad(rec);
+
+ key_parts = dict_index_get_n_unique_in_tree(index_for);
+ for (i = 0;
+ i < key_parts &&
+ (index_for->type & DICT_CLUSTERED || i < key_parts - 1);
+ i++) {
+ dict_field_t* field_f =
+ dict_index_get_nth_field(index_for, i);
+ const dict_col_t* col_f = dict_field_get_col(field_f);
+ dict_field_t* field_r =
+ dict_index_get_nth_field(index_ref, i);
+ const dict_col_t* col_r = dict_field_get_col(field_r);
+
+ data = rec_get_nth_field(rec, offsets, i, &len);
+ if (key_len + ((len != UNIV_SQL_NULL) ? len + 1 : 1) >
+ *buf_len) {
+ fprintf (stderr,
+ "WSREP: FK key len exceeded %lu %lu %lu\n",
+ key_len, len, *buf_len);
+ goto err_out;
+ }
+
+ if (len == UNIV_SQL_NULL) {
+ ut_a(!(col_f->prtype & DATA_NOT_NULL));
+ *buf++ = 1;
+ key_len++;
+ } else if (!new_protocol) {
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ memcpy(buf, data, len);
+ wsrep_innobase_mysql_sort(
+ (int)(col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)dtype_get_charset_coll(col_f->prtype),
+ buf, len);
+ } else { /* new protocol */
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ switch (col_f->mtype) {
+ case DATA_INT: {
+ byte* ptr = buf+len;
+ for (;;) {
+ ptr--;
+ *ptr = *data;
+ if (ptr == buf) {
+ break;
+ }
+ data++;
+ }
+
+ if (!(col_f->prtype & DATA_UNSIGNED)) {
+ buf[len-1] = (byte) (buf[len-1] ^ 128);
+ }
+
+ break;
+ }
+ case DATA_VARCHAR:
+ case DATA_VARMYSQL:
+ case DATA_CHAR:
+ case DATA_MYSQL:
+ /* Copy the actual data */
+ ut_memcpy(buf, data, len);
+ wsrep_innobase_mysql_sort(
+ (int)
+ (col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)
+ dtype_get_charset_coll(col_f->prtype),
+ buf, len);
+ break;
+ case DATA_BLOB:
+ case DATA_BINARY:
+ memcpy(buf, data, len);
+ break;
+ default:
+ break;
+ }
+
+ key_len += len;
+ buf += len;
+ }
+ }
+
+ rec_validate(rec, offsets);
+
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ *buf_len = key_len;
+ return DB_SUCCESS;
+
+ err_out:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+ return DB_ERROR;
+}
+#endif /* WITH_WSREP */
# ifdef UNIV_DEBUG
/************************************************************//**
@@ -1958,5 +2093,4 @@ rec_get_trx_id(
return(trx_read_trx_id(trx_id));
}
-# endif /* UNIV_DEBUG */
-#endif /* !UNIV_HOTBACKUP */
+#endif /* UNIV_DEBUG */
diff --git a/storage/xtradb/row/row0ins.cc b/storage/xtradb/row/row0ins.cc
index 34e34925b9a..a43895f7949 100644
--- a/storage/xtradb/row/row0ins.cc
+++ b/storage/xtradb/row/row0ins.cc
@@ -918,6 +918,14 @@ row_ins_invalidate_query_cache(
innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
mem_free(buf);
}
+#ifdef WITH_WSREP
+ulint wsrep_append_foreign_key(trx_t *trx,
+ dict_foreign_t* foreign,
+ const rec_t* clust_rec,
+ dict_index_t* clust_index,
+ ibool referenced,
+ ibool shared);
+#endif /* WITH_WSREP */
/*********************************************************************//**
Perform referential actions or checks when a parent row is deleted or updated
@@ -1272,6 +1280,17 @@ row_ins_foreign_check_on_constraint(
err = row_update_cascade_for_mysql(thr, cascade,
foreign->foreign_table);
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS) {
+ err = (dberr_t)wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ clust_rec,
+ clust_index,
+ FALSE, FALSE);
+ }
+#endif /* WITH_WSREP */
+
if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
fprintf(stderr,
"InnoDB: error: table %s has the counter 0"
@@ -1607,7 +1626,14 @@ run_again:
if (check_ref) {
err = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ err = (dberr_t) wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ rec,
+ check_index,
+ check_ref, TRUE);
+#endif /* WITH_WSREP */
goto end_scan;
} else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE
@@ -1911,6 +1937,9 @@ row_ins_scan_sec_index_for_duplicate(
mem_heap_t* offsets_heap)
/*!< in/out: memory heap that can be emptied */
{
+#ifdef WITH_WSREP
+ trx_t* trx = thr_get_trx(thr);
+#endif
ulint n_unique;
int cmp;
ulint n_fields_cmp;
@@ -1979,7 +2008,16 @@ row_ins_scan_sec_index_for_duplicate(
if (flags & BTR_NO_LOCKING_FLAG) {
/* Set no locks when applying log
in online table rebuild. */
+
+#ifdef WITH_WSREP
+ /* slave applier must not get duplicate error */
+ } else if (allow_duplicates ||
+ (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd))) {
+#else
+
} else if (allow_duplicates) {
+#endif
/* If the SQL-query will update or replace
duplicate key we will take X-lock for
@@ -2185,7 +2223,13 @@ row_ins_duplicate_error_in_clust(
sure that in roll-forward we get the same duplicate
errors as in original execution */
+#ifdef WITH_WSREP
+ if (trx->duplicates ||
+ (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd))) {
+#else
if (trx->duplicates) {
+#endif
/* If the SQL-query will update or replace
duplicate key we will take X-lock for
@@ -2230,7 +2274,13 @@ duplicate:
offsets = rec_get_offsets(rec, cursor->index, offsets,
ULINT_UNDEFINED, &heap);
+#ifdef WITH_WSREP
+ if (trx->duplicates ||
+ (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd))) {
+#else
if (trx->duplicates) {
+#endif
/* If the SQL-query will update or replace
duplicate key we will take X-lock for
diff --git a/storage/xtradb/row/row0upd.cc b/storage/xtradb/row/row0upd.cc
index 4cf1c604c47..ee80e034eb1 100644
--- a/storage/xtradb/row/row0upd.cc
+++ b/storage/xtradb/row/row0upd.cc
@@ -53,6 +53,9 @@ Created 12/27/1996 Heikki Tuuri
#include "pars0sym.h"
#include "eval0eval.h"
#include "buf0lru.h"
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+#endif
/* What kind of latch and lock can we assume when the control comes to
@@ -172,6 +175,28 @@ func_exit:
return(is_referenced);
}
+#ifdef WITH_WSREP
+ulint
+wsrep_append_foreign_key(
+ trx_t* trx,
+ dict_foreign_t* foreign,
+ const rec_t* clust_rec,
+ dict_index_t* clust_index,
+ ibool referenced,
+ ibool shared);
+
+ulint
+wsrep_row_upd_check_foreign_constraints(
+ upd_node_t* node, /*!< in: row update node */
+ btr_pcur_t* pcur, /*!< in: cursor positioned on a record; NOTE: the
+ cursor position is lost in this function! */
+ dict_table_t* table, /*!< in: table in question */
+ dict_index_t* index, /*!< in: index of the cursor */
+ ulint* offsets,/*!< in/out: rec_get_offsets(pcur.rec, index) */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr); /*!< in: mtr */
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Checks if possible foreign key constraints hold after a delete of the record
under pcur.
@@ -1822,6 +1847,34 @@ row_upd_sec_index_entry(
node, &pcur, index->table,
index, offsets, thr, &mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced) {
+ ulint* offsets =
+ rec_get_offsets(
+ rec, index, NULL, ULINT_UNDEFINED,
+ &heap);
+ err = (dberr_t) wsrep_row_upd_check_foreign_constraints(
+ node, &pcur, index->table,
+ index, offsets, thr, &mtr);
+
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug)
+ fprintf (stderr,
+ "WSREP: sec index FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %d",
+ err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
}
break;
}
@@ -1962,6 +2015,123 @@ row_upd_clust_rec_by_insert_inherit_func(
return(inherit);
}
+#ifdef WITH_WSREP
+ulint
+wsrep_row_upd_check_foreign_constraints(
+/*=================================*/
+ upd_node_t* node, /*!< in: row update node */
+ btr_pcur_t* pcur, /*!< in: cursor positioned on a record; NOTE: the
+ cursor position is lost in this function! */
+ dict_table_t* table, /*!< in: table in question */
+ dict_index_t* index, /*!< in: index of the cursor */
+ ulint* offsets,/*!< in/out: rec_get_offsets(pcur.rec, index) */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_foreign_t* foreign;
+ mem_heap_t* heap;
+ dtuple_t* entry;
+ trx_t* trx;
+ const rec_t* rec;
+ ulint n_ext;
+ ulint err;
+ ibool got_s_lock = FALSE;
+
+ if (UT_LIST_GET_FIRST(table->foreign_list) == NULL) {
+
+ return(DB_SUCCESS);
+ }
+
+ trx = thr_get_trx(thr);
+
+ rec = btr_pcur_get_rec(pcur);
+ ut_ad(rec_offs_validate(rec, index, offsets));
+
+ heap = mem_heap_create(500);
+
+ entry = row_rec_to_index_entry(rec, index, offsets, &n_ext, heap);
+
+ mtr_commit(mtr);
+
+ mtr_start(mtr);
+
+ if (trx->dict_operation_lock_mode == 0) {
+ got_s_lock = TRUE;
+
+ row_mysql_freeze_data_dictionary(trx);
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ /* Note that we may have an update which updates the index
+ record, but does NOT update the first fields which are
+ referenced in a foreign key constraint. Then the update does
+ NOT break the constraint. */
+
+ if (foreign->foreign_index == index
+ && (node->is_delete
+ || row_upd_changes_first_fields_binary(
+ entry, index, node->update,
+ foreign->n_fields))) {
+
+ if (foreign->referenced_table == NULL) {
+ foreign->referenced_table =
+ dict_table_open_on_name(
+ foreign->referenced_table_name_lookup,
+ FALSE, FALSE, DICT_ERR_IGNORE_NONE);
+ }
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)++;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ /* NOTE that if the thread ends up waiting for a lock
+ we will release dict_operation_lock temporarily!
+ But the counter on the table protects 'foreign' from
+ being dropped while the check is running. */
+
+ err = row_ins_check_foreign_constraint(
+ TRUE, foreign, table, entry, thr);
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ ut_a(foreign->referenced_table
+ ->n_foreign_key_checks_running > 0);
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)--;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if (err != DB_SUCCESS) {
+
+ goto func_exit;
+ }
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+ err = DB_SUCCESS;
+func_exit:
+ if (got_s_lock) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ mem_heap_free(heap);
+
+ return(err);
+}
+#endif /* WITH_WSREP */
+
/***********************************************************//**
Marks the clustered index record deleted and inserts the updated version
of the record to the index. This function should be used when the ordering
@@ -2070,6 +2240,31 @@ err_exit:
goto err_exit;
}
}
+#ifdef WITH_WSREP
+ if (!referenced) {
+ err = (dberr_t) wsrep_row_upd_check_foreign_constraints(
+ node, pcur, table, index, offsets, thr, mtr);
+
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: insert FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %d",
+ err);
+ break;
+ }
+ if (err != DB_SUCCESS) {
+ goto err_exit;
+ }
+ }
+#endif /* WITH_WSREP */
}
mtr_commit(mtr);
@@ -2305,6 +2500,9 @@ row_upd_del_mark_clust_rec(
btr_pcur_t* pcur;
btr_cur_t* btr_cur;
dberr_t err;
+#ifdef WITH_WSREP
+ rec_t* rec;
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -2321,9 +2519,18 @@ row_upd_del_mark_clust_rec(
/* Mark the clustered index record deleted; we do not have to check
locks, because we assume that we have an x-lock on the record */
+#ifdef WITH_WSREP
+ rec = btr_cur_get_rec(btr_cur);
+#endif /* WITH_WSREP */
+
err = btr_cur_del_mark_set_clust_rec(
- btr_cur_get_block(btr_cur), btr_cur_get_rec(btr_cur),
- index, offsets, thr, mtr);
+ btr_cur_get_block(btr_cur),
+#ifdef WITH_WSREP
+ rec, index, offsets, thr, mtr);
+#else
+ btr_cur_get_rec(btr_cur), index, offsets, thr, mtr);
+#endif /* WITH_WSREP */
+
if (err == DB_SUCCESS && referenced) {
/* NOTE that the following call loses the position of pcur ! */
@@ -2331,6 +2538,29 @@ row_upd_del_mark_clust_rec(
node, pcur, index->table, index, offsets, thr, mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced) {
+ err = (dberr_t) wsrep_row_upd_check_foreign_constraints(
+ node, pcur, index->table, index, offsets, thr, mtr);
+
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: clust rec FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: clust rec referenced FK check fail: %lu",
+ err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
+
mtr_commit(mtr);
return(err);
diff --git a/storage/xtradb/srv/srv0conc.cc b/storage/xtradb/srv/srv0conc.cc
index 413d5c4eab2..a1dd90a024a 100644
--- a/storage/xtradb/srv/srv0conc.cc
+++ b/storage/xtradb/srv/srv0conc.cc
@@ -43,6 +43,10 @@ Created 2011/04/18 Sunny Bains
#include "trx0trx.h"
#include "mysql/plugin.h"
+#ifdef WITH_WSREP
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+extern my_bool wsrep_debug;
+#endif
/** Number of times a thread is allowed to enter InnoDB within the same
SQL query after it has once got the ticket. */
@@ -614,5 +618,32 @@ srv_conc_get_active_threads(void)
/*==============================*/
{
return(srv_conc.n_active);
- }
+}
+
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx) /*!< in: transaction object associated with the
+ thread */
+{
+#ifdef HAVE_ATOMIC_BUILTINS
+ /* aborting transactions will enter innodb by force in
+ srv_conc_enter_innodb_with_atomics(). No need to cancel here,
+ thr will wake up after os_sleep and let to enter innodb
+ */
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel, no atomics\n");
+#else
+ os_fast_mutex_lock(&srv_conc_mutex);
+ if (trx->wsrep_event) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel\n");
+ os_event_set(trx->wsrep_event);
+ }
+ os_fast_mutex_unlock(&srv_conc_mutex);
+#endif
+}
+#endif /* WITH_WSREP */
diff --git a/storage/xtradb/srv/srv0srv.cc b/storage/xtradb/srv/srv0srv.cc
index 953bbba11f7..87615e161da 100644
--- a/storage/xtradb/srv/srv0srv.cc
+++ b/storage/xtradb/srv/srv0srv.cc
@@ -82,6 +82,14 @@ ulong innobase_thd_get_thread_id(const void* thd);
/* prototypes for new functions added to ha_innodb.cc */
ibool innobase_get_slow_log();
+#ifdef WITH_WSREP
+extern int wsrep_debug;
+extern int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
+/* The following counter is incremented whenever there is some user activity
+in the server */
+UNIV_INTERN ulint srv_activity_count = 0;
+
/* The following is the maximum allowed duration of a lock wait. */
UNIV_INTERN ulint srv_fatal_semaphore_wait_threshold = 600;
@@ -160,6 +168,8 @@ use simulated aio we build below with threads.
Currently we support native aio on windows and linux */
UNIV_INTERN my_bool srv_use_native_aio = TRUE;
+UNIV_INTERN my_bool srv_lock_timeout_active = FALSE;
+
#ifdef __WIN__
/* Windows native condition variables. We use runtime loading / function
pointers, because they are not available on Windows Server 2003 and
@@ -2040,6 +2050,28 @@ exit_func:
OS_THREAD_DUMMY_RETURN;
}
+#ifdef WITH_WSREP
+/*********************************************************************//**
+check if lock timeout was for priority thread,
+as a side effect trigger lock monitor
+@return false for regular lock timeout */
+static ibool
+wsrep_is_BF_lock_timeout(
+/*====================*/
+ srv_slot_t* slot) /* in: lock slot to check for lock priority */
+{
+ if (wsrep_on(thr_get_trx(slot->thr)->mysql_thd) &&
+ wsrep_thd_is_brute_force((thr_get_trx(slot->thr))->mysql_thd)) {
+ fprintf(stderr, "WSREP: BF lock wait long\n");
+ srv_print_innodb_monitor = TRUE;
+ srv_print_innodb_lock_monitor = TRUE;
+ os_event_set(lock_sys->timeout_event);
+ return TRUE;
+ }
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
A thread which prints warnings about semaphore waits which have lasted
too long. These can be used to track bugs which cause hangs.
diff --git a/storage/xtradb/trx/trx0roll.cc b/storage/xtradb/trx/trx0roll.cc
index 1089607c6d1..eb2af877a6d 100644
--- a/storage/xtradb/trx/trx0roll.cc
+++ b/storage/xtradb/trx/trx0roll.cc
@@ -45,6 +45,9 @@ Created 3/26/1996 Heikki Tuuri
#include "pars0pars.h"
#include "srv0mon.h"
#include "trx0sys.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h"
+#endif /* WITH_WSREP */
/** This many pages must be undone before a truncate is tried within
rollback */
@@ -382,6 +385,13 @@ trx_rollback_to_savepoint_for_mysql_low(
trx->op_info = "";
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->lock.was_chosen_as_deadlock_victim) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
+
return(err);
}
@@ -1020,6 +1030,12 @@ trx_roll_try_truncate(
if (trx->update_undo) {
trx_undo_truncate_end(trx, trx->update_undo, limit);
}
+
+#ifdef WITH_WSREP_OUT
+ if (wsrep_on(trx->mysql_thd)) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif /* WITH_WSREP */
}
/***********************************************************************//**
diff --git a/storage/xtradb/trx/trx0sys.cc b/storage/xtradb/trx/trx0sys.cc
index b86ee90b1e9..daa13b8b2c5 100644
--- a/storage/xtradb/trx/trx0sys.cc
+++ b/storage/xtradb/trx/trx0sys.cc
@@ -44,6 +44,10 @@ Created 3/26/1996 Heikki Tuuri
#include "os0file.h"
#include "read0read.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h" /* wsrep_is_wsrep_xid() */
+#endif /* */
+
/** The file format tag structure with id and name. */
struct file_format_t {
ulint id; /*!< id of the file format */
@@ -202,10 +206,14 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr */
{
+#ifndef WITH_WSREP
trx_sysf_t* sys_header;
-
+#endif
if (ut_strlen(file_name) >= TRX_SYS_MYSQL_LOG_NAME_LEN) {
/* We cannot fit the name to the 512 bytes we have reserved */
@@ -213,7 +221,9 @@ trx_sys_update_mysql_binlog_offset(
return;
}
+#ifndef WITH_WSREP
sys_header = trx_sysf_get(mtr);
+#endif
if (mach_read_from_4(sys_header + field
+ TRX_SYS_MYSQL_LOG_MAGIC_N_FLD)
@@ -300,6 +310,87 @@ trx_sys_print_mysql_binlog_offset(void)
mtr_commit(&mtr);
}
+#ifdef WITH_WSREP
+
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: transaction XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ ut_ad(xid && mtr);
+ ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid((const void *)xid));
+
+ if (mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD)
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD,
+ TRX_SYS_WSREP_XID_MAGIC_N,
+ MLOG_4BYTES, mtr);
+ }
+
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_FORMAT,
+ (int)xid->formatID,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_GTRID_LEN,
+ (int)xid->gtrid_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_BQUAL_LEN,
+ (int)xid->bqual_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_string(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_DATA,
+ (const unsigned char*) xid->data,
+ XIDDATASIZE, mtr);
+
+}
+
+void
+trx_sys_read_wsrep_checkpoint(XID* xid)
+/*===================================*/
+{
+ trx_sysf_t* sys_header;
+ mtr_t mtr;
+ ulint magic;
+
+ ut_ad(xid);
+
+ mtr_start(&mtr);
+
+ sys_header = trx_sysf_get(&mtr);
+
+ if ((magic = mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD))
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ memset(xid, 0, sizeof(*xid));
+ xid->formatID = -1;
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ return;
+ }
+
+ xid->formatID = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_FORMAT);
+ xid->gtrid_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_GTRID_LEN);
+ xid->bqual_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_BQUAL_LEN);
+ ut_memcpy(xid->data,
+ sys_header + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_DATA,
+ XIDDATASIZE);
+
+ mtr_commit(&mtr);
+}
+
+#endif /* WITH_WSREP */
+
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header if
the magic number shows it valid. */
diff --git a/storage/xtradb/trx/trx0trx.cc b/storage/xtradb/trx/trx0trx.cc
index 432cb8f6330..bbf76effc52 100644
--- a/storage/xtradb/trx/trx0trx.cc
+++ b/storage/xtradb/trx/trx0trx.cc
@@ -288,6 +288,9 @@ trx_create(void)
trx->lock.table_locks = ib_vector_create(
heap_alloc, sizeof(void**), 32);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
return(trx);
}
@@ -1012,6 +1015,11 @@ trx_start_low(
srv_undo_logs, srv_undo_tablespaces);
}
+#ifdef WITH_WSREP
+ memset(&trx->xid, 0, sizeof(trx->xid));
+ trx->xid.formatID = -1;
+#endif /* WITH_WSREP */
+
/* The initial value for trx->no: TRX_ID_MAX is used in
read_view_open_now: */
@@ -1132,6 +1140,9 @@ trx_write_serialisation_history(
trx_t* trx, /*!< in/out: transaction */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header;
+#endif /* WITH_WSREP */
trx_rseg_t* rseg;
rseg = trx->rseg;
@@ -1178,6 +1189,15 @@ trx_write_serialisation_history(
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
+#ifdef WITH_WSREP
+ sys_header = trx_sysf_get(mtr);
+ /* Update latest MySQL wsrep XID in trx sys header. */
+ if (wsrep_is_wsrep_xid((const void *)&trx->xid))
+ {
+ trx_sys_update_wsrep_checkpoint(&trx->xid, sys_header, mtr);
+ }
+#endif /* WITH_WSREP */
+
/* Update the latest MySQL binlog name and offset info
in trx sys header if MySQL binlogging is on or the database
server is a MySQL replication slave */
@@ -1188,7 +1208,11 @@ trx_write_serialisation_history(
trx_sys_update_mysql_binlog_offset(
trx->mysql_log_file_name,
trx->mysql_log_offset,
- TRX_SYS_MYSQL_LOG_INFO, mtr);
+ TRX_SYS_MYSQL_LOG_INFO,
+#ifdef WITH_WSREP
+ sys_header,
+#endif /* WITH_WSREP */
+ mtr);
trx->mysql_log_file_name = NULL;
}
@@ -1495,6 +1519,11 @@ trx_commit_in_memory(
trx->dict_operation = TRX_DICT_OP_NONE;
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd)) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
ut_ad(trx_sys->descr_n_used <= UT_LIST_GET_LEN(trx_sys->rw_trx_list));
trx->error_state = DB_SUCCESS;
@@ -1677,6 +1706,10 @@ trx_commit_or_rollback_prepare(
switch (trx->state) {
case TRX_STATE_NOT_STARTED:
+#ifdef WITH_WSREP
+ ut_d(trx->start_file = __FILE__);
+ ut_d(trx->start_line = __LINE__);
+#endif /* WITH_WSREP */
trx_start_low(trx);
/* fall through */
case TRX_STATE_ACTIVE:
@@ -2447,6 +2480,10 @@ trx_start_if_not_started_low(
{
switch (trx->state) {
case TRX_STATE_NOT_STARTED:
+#ifdef WITH_WSREP
+ ut_d(trx->start_file = __FILE__);
+ ut_d(trx->start_line = __LINE__);
+#endif /* WITH_WSREP */
trx_start_low(trx);
/* fall through */
case TRX_STATE_ACTIVE:
@@ -2481,6 +2518,10 @@ trx_start_for_ddl_low(
trx->ddl = true;
+#ifdef WITH_WSREP
+ ut_d(trx->start_file = __FILE__);
+ ut_d(trx->start_line = __LINE__);
+#endif /* WITH_WSREP */
trx_start_low(trx);
return;