summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/innobase/dict/dict0dict.c21
-rw-r--r--storage/innobase/fil/fil0fil.c1
-rw-r--r--storage/innobase/handler/ha_innodb.cc1383
-rw-r--r--storage/innobase/handler/ha_innodb.h41
-rw-r--r--storage/innobase/handler/handler0alter.cc4
-rw-r--r--storage/innobase/include/dict0mem.h3
-rw-r--r--storage/innobase/include/ha_prototypes.h13
-rw-r--r--storage/innobase/include/lock0lock.h3
-rw-r--r--storage/innobase/include/rem0rec.h9
-rw-r--r--storage/innobase/include/srv0srv.h12
-rw-r--r--storage/innobase/include/trx0sys.h35
-rw-r--r--storage/innobase/include/trx0trx.h3
-rw-r--r--storage/innobase/lock/lock0lock.c355
-rw-r--r--storage/innobase/os/os0file.c23
-rw-r--r--storage/innobase/rem/rem0rec.c133
-rw-r--r--storage/innobase/row/row0ins.c29
-rw-r--r--storage/innobase/row/row0upd.c299
-rw-r--r--storage/innobase/srv/srv0srv.c143
-rw-r--r--storage/innobase/trx/trx0roll.c27
-rw-r--r--storage/innobase/trx/trx0sys.c131
-rw-r--r--storage/innobase/trx/trx0trx.c32
-rw-r--r--storage/innobase/ut/ut0ut.c4
-rw-r--r--storage/tokudb/CMakeLists.txt2
-rw-r--r--storage/xtradb/buf/buf0buf.c2
-rw-r--r--storage/xtradb/dict/dict0dict.c21
-rw-r--r--storage/xtradb/handler/ha_innodb.cc1383
-rw-r--r--storage/xtradb/handler/ha_innodb.h41
-rw-r--r--storage/xtradb/handler/handler0alter.cc4
-rw-r--r--storage/xtradb/include/dict0mem.h3
-rw-r--r--storage/xtradb/include/ha_prototypes.h13
-rw-r--r--storage/xtradb/include/lock0lock.h3
-rw-r--r--storage/xtradb/include/rem0rec.h9
-rw-r--r--storage/xtradb/include/srv0srv.h12
-rw-r--r--storage/xtradb/include/trx0sys.h32
-rw-r--r--storage/xtradb/include/trx0trx.h3
-rw-r--r--storage/xtradb/lock/lock0lock.c356
-rw-r--r--storage/xtradb/os/os0file.c23
-rw-r--r--storage/xtradb/os/os0proc.c9
-rw-r--r--storage/xtradb/rem/rem0rec.c133
-rw-r--r--storage/xtradb/row/row0ins.c29
-rw-r--r--storage/xtradb/row/row0upd.c299
-rw-r--r--storage/xtradb/srv/srv0srv.c143
-rw-r--r--storage/xtradb/srv/srv0start.c12
-rw-r--r--storage/xtradb/trx/trx0roll.c27
-rw-r--r--storage/xtradb/trx/trx0sys.c123
-rw-r--r--storage/xtradb/trx/trx0trx.c26
46 files changed, 5380 insertions, 32 deletions
diff --git a/storage/innobase/dict/dict0dict.c b/storage/innobase/dict/dict0dict.c
index 0e4691658d5..1eb557811bd 100644
--- a/storage/innobase/dict/dict0dict.c
+++ b/storage/innobase/dict/dict0dict.c
@@ -2667,7 +2667,26 @@ next_rec:
return(NULL);
}
-
+#ifdef WITH_WSREP
+dict_index_t*
+wsrep_dict_foreign_find_index(
+/*====================*/
+ dict_table_t* table, /*!< in: table */
+ const char** columns,/*!< in: array of column names */
+ ulint n_cols, /*!< in: number of columns */
+ dict_index_t* types_idx, /*!< in: NULL or an index to whose types the
+ column types must match */
+ ibool check_charsets,
+ /*!< in: whether to check charsets.
+ only has an effect if types_idx != NULL */
+ ulint check_null)
+ /*!< in: nonzero if none of the columns must
+ be declared NOT NULL */
+{
+ return dict_foreign_find_index(
+ table, columns, n_cols, types_idx, check_charsets, check_null);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Find an index that is equivalent to the one passed in and is not marked
for deletion.
diff --git a/storage/innobase/fil/fil0fil.c b/storage/innobase/fil/fil0fil.c
index b30e8056c0a..7aa1a42358e 100644
--- a/storage/innobase/fil/fil0fil.c
+++ b/storage/innobase/fil/fil0fil.c
@@ -4205,7 +4205,6 @@ fil_extend_space_to_desired_size(
}
mem_free(buf2);
-
fil_node_complete_io(node, fil_system, OS_FILE_WRITE);
complete_io:
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 2c2e2dcd6d6..748c0569636 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -94,6 +94,12 @@ extern "C" {
enum_tx_isolation thd_get_trx_isolation(const THD* thd);
+#ifdef WITH_WSREP
+#include "../storage/innobase/include/ut0byte.h"
+#ifndef EXTRA_DEBUG
+ //#include "../storage/innobase/include/ut0byte.ic"
+#endif /* EXTRA_DEBUG */
+#endif /* WITH_WSREP */
}
#include "ha_innodb.h"
@@ -103,6 +109,35 @@ enum_tx_isolation thd_get_trx_isolation(const THD* thd);
# define MYSQL_PLUGIN_IMPORT /* nothing */
# endif /* MYSQL_PLUGIN_IMPORT */
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+#include <my_md5.h>
+extern my_bool wsrep_certify_nonPK;
+class binlog_trx_data;
+extern handlerton *binlog_hton;
+
+extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
+extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd;
+
+static inline wsrep_ws_handle_t*
+wsrep_ws_handle(THD* thd, const trx_t* trx) {
+ return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
+ (wsrep_trx_id_t)trx->id);
+}
+
+extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_buf_t* key,
+ size_t* key_len);
+
+extern handlerton * wsrep_hton;
+extern handlerton * binlog_hton;
+extern void wsrep_cleanup_transaction(THD *thd);
+#endif /* WITH_WSREP */
/** to protect innobase_open_files */
static mysql_mutex_t innobase_share_mutex;
/** to force correct commit order in binlog */
@@ -871,6 +906,15 @@ thd_to_trx(
{
return(*(trx_t**) thd_ha_data(thd, innodb_hton_ptr));
}
+#ifdef WITH_WSREP
+ulonglong
+thd_to_trx_id(
+/*=======*/
+ THD* thd) /*!< in: MySQL thread */
+{
+ return(thd_to_trx(thd)->id);
+}
+#endif
/********************************************************************//**
Call this function when mysqld passes control to the client. That is to
@@ -902,6 +946,15 @@ innobase_release_temporary_latches(
return(0);
}
+#ifdef WITH_WSREP
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal);
+static void
+wsrep_fake_trx_id(handlerton* hton, THD *thd);
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid);
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid);
+#endif
/********************************************************************//**
Increments innobase_active_counter and every INNOBASE_WAKE_INTERVALth
time calls srv_active_wake_master_thread. This function should be used
@@ -1315,6 +1368,9 @@ int
innobase_mysql_tmpfile(void)
/*========================*/
{
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ os_event_wait(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
int fd2 = -1;
File fd;
@@ -2264,6 +2320,12 @@ innobase_init(
innobase_hton->flags=HTON_EXTENDED_KEYS;
innobase_hton->release_temporary_latches=innobase_release_temporary_latches;
innobase_hton->alter_table_flags = innobase_alter_table_flags;
+#ifdef WITH_WSREP
+ innobase_hton->wsrep_abort_transaction=wsrep_abort_transaction;
+ innobase_hton->wsrep_set_checkpoint=innobase_wsrep_set_checkpoint;
+ innobase_hton->wsrep_get_checkpoint=innobase_wsrep_get_checkpoint;
+ innobase_hton->wsrep_fake_trx_id=wsrep_fake_trx_id;
+#endif /* WITH_WSREP */
innobase_hton->kill_query = innobase_kill_query;
ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR);
@@ -2728,10 +2790,30 @@ innobase_commit_low(
/*================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ THD* thd = (THD*)trx->mysql_thd;
+ const char* tmp = 0;
+ if (wsrep_on((void*)thd)) {
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1,
+ "innobase_commit_low():trx_commit_for_mysql(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ tmp = thd_proc_info(thd, info);
+
+#else
+ tmp = thd_proc_info(thd, "innobase_commit_low()");
+#endif /* WSREP_PROC_INFO */
+ }
+#endif /* WITH_WSREP */
if (trx_is_started(trx)) {
trx_commit_for_mysql(trx);
}
+#ifdef WITH_WSREP
+ if (wsrep_on((void*)thd)) { thd_proc_info(thd, tmp); }
+#endif /* WITH_WSREP */
}
/*****************************************************************//**
@@ -3395,7 +3477,14 @@ ha_innobase::max_supported_key_length() const
therefore set to slightly less than 1 / 4 of page size which
is 16 kB; but currently MySQL does not work with keys whose
size is > MAX_KEY_LENGTH */
+#ifdef WITH_WSREP
+ /* this may look like obsolete code, but this ifdef is here
+ just to make sure we will see bzr merge conflict, if Oracle
+ changes max key length */
+ return(3500);
+#else
return(3500);
+#endif
}
/****************************************************************//**
@@ -4481,7 +4570,119 @@ innobase_mysql_cmp(
return(0);
}
+#ifdef WITH_WSREP
+extern "C" UNIV_INTERN
+int
+wsrep_innobase_mysql_sort(
+/*===============*/
+ /* out: str contains sort string */
+ int mysql_type, /* in: MySQL type */
+ uint charset_number, /* in: number of the charset */
+ unsigned char* str, /* in: data field */
+ unsigned int str_length, /* in: data field length,
+ not UNIV_SQL_NULL */
+ unsigned int buf_length) /* in: total str buffer length */
+
+{
+ CHARSET_INFO* charset;
+ enum_field_types mysql_tp;
+ int ret_length = str_length;
+
+ DBUG_ASSERT(str_length != UNIV_SQL_NULL);
+
+ mysql_tp = (enum_field_types) mysql_type;
+
+ switch (mysql_tp) {
+
+ case MYSQL_TYPE_BIT:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_VARCHAR:
+ {
+ uchar tmp_str[REC_VERSION_56_MAX_INDEX_COL_LEN];
+ uint tmp_length = REC_VERSION_56_MAX_INDEX_COL_LEN;
+
+ /* Use the charset number to pick the right charset struct for
+ the comparison. Since the MySQL function get_charset may be
+ slow before Bar removes the mutex operation there, we first
+ look at 2 common charsets directly. */
+
+ if (charset_number == default_charset_info->number) {
+ charset = default_charset_info;
+ } else if (charset_number == my_charset_latin1.number) {
+ charset = &my_charset_latin1;
+ } else {
+ charset = get_charset(charset_number, MYF(MY_WME));
+
+ if (charset == NULL) {
+ sql_print_error("InnoDB needs charset %lu for doing "
+ "a comparison, but MySQL cannot "
+ "find that charset.",
+ (ulong) charset_number);
+ ut_a(0);
+ }
+ }
+
+ ut_a(str_length <= tmp_length);
+ memcpy(tmp_str, str, str_length);
+
+ if (wsrep_protocol_version < 3) {
+ tmp_length = charset->coll->strnxfrm(
+ charset, str, str_length,
+ tmp_str, str_length);
+ DBUG_ASSERT(tmp_length <= str_length);
+ } else {
+ /* strnxfrm will expand the destination string,
+ protocols < 3 truncated the sorted sring
+ protocols > 3 gets full sorted sring
+ */
+ /* 5.5 strnxfrm pads the tail with spaces and
+ always returns the full destination buffer lenght
+ we cannot know how many characters were converted
+ using 2 * str length here as best guess
+ */
+ uint dst_length = (str_length * 2 < tmp_length) ?
+ (str_length * 2) : tmp_length;
+ tmp_length = charset->coll->strnxfrm(
+ charset, str, dst_length,
+ tmp_str, str_length);
+ DBUG_ASSERT(tmp_length <= buf_length);
+ ret_length = tmp_length;
+ }
+
+ break;
+ }
+ case MYSQL_TYPE_DECIMAL :
+ case MYSQL_TYPE_TINY :
+ case MYSQL_TYPE_SHORT :
+ case MYSQL_TYPE_LONG :
+ case MYSQL_TYPE_FLOAT :
+ case MYSQL_TYPE_DOUBLE :
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_TIMESTAMP :
+ case MYSQL_TYPE_LONGLONG :
+ case MYSQL_TYPE_INT24 :
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_DATETIME :
+ case MYSQL_TYPE_YEAR :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_NEWDECIMAL :
+ case MYSQL_TYPE_ENUM :
+ case MYSQL_TYPE_SET :
+ case MYSQL_TYPE_GEOMETRY :
+ break;
+ default:
+ break;
+ }
+ return ret_length;
+}
+#endif // WITH_WSREP
/**************************************************************//**
Converts a MySQL type to an InnoDB type. Note that this function returns
the 'mtype' of InnoDB. InnoDB differentiates between MySQL's old <= 4.1
@@ -4600,6 +4801,268 @@ innobase_read_from_2_little_endian(
/*******************************************************************//**
Stores a key value for a row to a buffer.
@return key value length as stored in buff */
+#ifdef WITH_WSREP
+UNIV_INTERN
+uint
+wsrep_store_key_val_for_row(
+/*===============================*/
+ TABLE* table,
+ uint keynr, /*!< in: key number */
+ char* buff, /*!< in/out: buffer for the key value (in MySQL
+ format) */
+ uint buff_len,/*!< in: buffer length */
+ const uchar* record,
+ ibool* key_is_null)/*!< out: full key was null */
+{
+ KEY* key_info = table->key_info + keynr;
+ KEY_PART_INFO* key_part = key_info->key_part;
+ KEY_PART_INFO* end = key_part + key_info->key_parts;
+ char* buff_start = buff;
+ enum_field_types mysql_type;
+ Field* field;
+
+ DBUG_ENTER("store_key_val_for_row");
+
+ bzero(buff, buff_len);
+ *key_is_null = TRUE;
+
+ for (; key_part != end; key_part++) {
+ uchar sorted[REC_VERSION_56_MAX_INDEX_COL_LEN] = {'\0'};
+ ibool part_is_null = FALSE;
+
+ if (key_part->null_bit) {
+ if (record[key_part->null_offset] &
+ key_part->null_bit) {
+ *buff = 1;
+ part_is_null = TRUE;
+ } else {
+ *buff = 0;
+ }
+ buff++;
+ }
+ if (!part_is_null) *key_is_null = FALSE;
+
+ field = key_part->field;
+ mysql_type = field->type();
+
+ if (mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* >= 5.0.3 true VARCHAR */
+ ulint lenlen;
+ ulint len;
+ const byte* data;
+ ulint key_len;
+ ulint true_len;
+ CHARSET_INFO* cs;
+ int error=0;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+ cs = field->charset();
+
+ lenlen = (ulint)
+ (((Field_varstring*)field)->length_bytes);
+
+ data = row_mysql_read_true_varchar(&len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ lenlen);
+
+ true_len = len;
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) data,
+ (const char *) data + len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* In a column prefix index, we may need to truncate
+ the stored value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, data, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ if (wsrep_protocol_version > 1) {
+ memcpy(buff, sorted, true_len);
+ /* Note that we always reserve the maximum possible
+ length of the true VARCHAR in the key value, though
+ only len first bytes after the 2 length bytes contain
+ actual data. The rest of the space was reset to zero
+ in the bzero() call above. */
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else if (mysql_type == MYSQL_TYPE_TINY_BLOB
+ || mysql_type == MYSQL_TYPE_MEDIUM_BLOB
+ || mysql_type == MYSQL_TYPE_BLOB
+ || mysql_type == MYSQL_TYPE_LONG_BLOB
+ /* MYSQL_TYPE_GEOMETRY data is treated
+ as BLOB data in innodb. */
+ || mysql_type == MYSQL_TYPE_GEOMETRY) {
+
+ CHARSET_INFO* cs;
+ ulint key_len;
+ ulint true_len;
+ int error=0;
+ ulint blob_len;
+ const byte* blob_data;
+
+ ut_a(key_part->key_part_flag & HA_PART_KEY_SEG);
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+
+ cs = field->charset();
+
+ blob_data = row_mysql_read_blob_ref(&blob_len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ (ulint) field->pack_length());
+
+ true_len = blob_len;
+
+ ut_a(get_field_offset(table, field)
+ == key_part->offset);
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (blob_len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) blob_data,
+ (const char *) blob_data
+ + blob_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* All indexes on BLOB and TEXT are column prefix
+ indexes, and we may need to truncate the data to be
+ stored in the key value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, blob_data, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ memcpy(buff, sorted, true_len);
+
+ /* Note that we always reserve the maximum possible
+ length of the BLOB prefix in the key value. */
+ if (wsrep_protocol_version > 1) {
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else {
+ /* Here we handle all other data types except the
+ true VARCHAR, BLOB and TEXT. Note that the column
+ value we store may be also in a column prefix
+ index. */
+
+ CHARSET_INFO* cs;
+ ulint true_len;
+ ulint key_len;
+ const uchar* src_start;
+ int error=0;
+ enum_field_types real_type;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len;
+
+ continue;
+ }
+
+ src_start = record + key_part->offset;
+ real_type = field->real_type();
+ true_len = key_len;
+
+ /* Character set for the field is defined only
+ to fields whose type is string and real field
+ type is not enum or set. For these fields check
+ if character set is multi byte. */
+
+ if (real_type != MYSQL_TYPE_ENUM
+ && real_type != MYSQL_TYPE_SET
+ && ( mysql_type == MYSQL_TYPE_VAR_STRING
+ || mysql_type == MYSQL_TYPE_STRING)) {
+
+ cs = field->charset();
+
+ /* For multi byte character sets we need to
+ calculate the true length of the key */
+
+ if (key_len > 0 && cs->mbmaxlen > 1) {
+
+ true_len = (ulint)
+ cs->cset->well_formed_len(cs,
+ (const char *)src_start,
+ (const char *)src_start
+ + key_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+ memcpy(sorted, src_start, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ memcpy(buff, sorted, true_len);
+ } else {
+ memcpy(buff, src_start, true_len);
+ }
+ buff += true_len;
+
+ /* Pad the unused space with spaces. */
+
+#ifdef REMOVED
+ if (true_len < key_len) {
+ ulint pad_len = key_len - true_len;
+ ut_a(!(pad_len % cs->mbminlen));
+
+ cs->cset->fill(cs, buff, pad_len,
+ 0x20 /* space */);
+ buff += pad_len;
+ }
+#endif /* REMOVED */
+ }
+ }
+
+ ut_a(buff <= buff_start + buff_len);
+
+ DBUG_RETURN((uint)(buff - buff_start));
+}
+#endif /* WITH_WSREP */
UNIV_INTERN
uint
ha_innobase::store_key_val_for_row(
@@ -5203,6 +5666,9 @@ ha_innobase::write_row(
ulint error = 0;
int error_result= 0;
ibool auto_inc_used= FALSE;
+#ifdef WITH_WSREP
+ ibool auto_inc_inserted= FALSE; /* if NULL was inserted */
+#endif
ulint sql_command;
trx_t* trx = thd_to_trx(user_thd);
@@ -5233,8 +5699,20 @@ ha_innobase::write_row(
if ((sql_command == SQLCOM_ALTER_TABLE
|| sql_command == SQLCOM_OPTIMIZE
|| sql_command == SQLCOM_CREATE_INDEX
+#ifdef WITH_WSREP
+ || (wsrep_on(user_thd) && wsrep_load_data_splitting &&
+ sql_command == SQLCOM_LOAD &&
+ !thd_test_options(
+ user_thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+#endif /* WITH_WSREP */
|| sql_command == SQLCOM_DROP_INDEX)
&& num_write_row >= 10000) {
+#ifdef WITH_WSREP
+ if (wsrep_on(user_thd) && sql_command == SQLCOM_LOAD) {
+ WSREP_DEBUG("forced trx split for LOAD: %s",
+ wsrep_thd_query(user_thd));
+ }
+#endif /* WITH_WSREP */
/* ALTER TABLE is COMMITted at every 10000 copied rows.
The IX table lock for the original table has to be re-issued.
As this method will be called on a temporary table where the
@@ -5268,6 +5746,21 @@ no_commit:
*/
;
} else if (src_table == prebuilt->table) {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_SIZE_EXCEEDED:
+ case WSREP_TRX_CERT_FAIL:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Source table is not in InnoDB format:
no need to re-acquire locks on it. */
@@ -5278,6 +5771,20 @@ no_commit:
/* We will need an IX lock on the destination table. */
prebuilt->sql_stat_start = TRUE;
} else {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_SIZE_EXCEEDED:
+ case WSREP_TRX_CERT_FAIL:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Ensure that there are no other table locks than
LOCK_IX and LOCK_AUTO_INC on the destination table. */
@@ -5306,7 +5813,9 @@ no_commit:
/* Reset the error code before calling
innobase_get_auto_increment(). */
prebuilt->autoinc_error = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ auto_inc_inserted= (table->next_number_field->val_int() == 0);
+#endif
if ((error = update_auto_increment())) {
/* We don't want to mask autoinc overflow errors. */
@@ -5389,6 +5898,30 @@ no_commit:
case SQLCOM_REPLACE_SELECT:
goto set_max_autoinc;
+#ifdef WITH_WSREP
+ /* workaround for LP bug #355000, retrying the insert */
+ case SQLCOM_INSERT:
+ if (wsrep_on(current_thd) &&
+ auto_inc_inserted &&
+ wsrep_drupal_282555_workaround &&
+ !thd_test_options(current_thd,
+ OPTION_NOT_AUTOCOMMIT |
+ OPTION_BEGIN)) {
+ WSREP_DEBUG(
+ "retrying insert: %s",
+ (*wsrep_thd_query(current_thd)) ?
+ wsrep_thd_query(current_thd) :
+ (char *)"void");
+ error= DB_SUCCESS;
+ wsrep_thd_set_conflict_state(
+ current_thd, MUST_ABORT);
+ innodb_srv_conc_exit_innodb(prebuilt->trx);
+ /* jump straight to func exit over
+ * later wsrep hooks */
+ goto func_exit;
+ }
+ break;
+#endif
default:
break;
}
@@ -5437,6 +5970,20 @@ report_error:
error_result = convert_error_code_to_mysql((int) error,
prebuilt->table->flags,
user_thd);
+#ifdef WITH_WSREP
+ if (!error_result && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd) && !wsrep_consistency_check(user_thd) &&
+ (sql_command != SQLCOM_LOAD ||
+ thd_binlog_format(user_thd) == BINLOG_FORMAT_ROW)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ error_result = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
func_exit:
innobase_active_small();
@@ -5592,7 +6139,84 @@ calc_row_difference(
return(0);
}
+#ifdef WITH_WSREP
+static
+int
+wsrep_calc_row_hash(
+/*================*/
+ byte* digest, /*!< in/out: md5 sum */
+ const uchar* row, /*!< in: row in MySQL format */
+ TABLE* table, /*!< in: table in MySQL data
+ dictionary */
+ row_prebuilt_t* prebuilt, /*!< in: InnoDB prebuilt struct */
+ THD* thd) /*!< in: user thread */
+{
+ Field* field;
+ enum_field_types field_mysql_type;
+ uint n_fields;
+ ulint len;
+ const byte* ptr;
+ ulint col_type;
+ uint i;
+
+ my_MD5Context ctx;
+ my_MD5Init (&ctx);
+
+ n_fields = table->s->fields;
+
+ for (i = 0; i < n_fields; i++) {
+ byte null_byte=0;
+ byte true_byte=1;
+
+ field = table->field[i];
+
+ ptr = (const byte*) row + get_field_offset(table, field);
+ len = field->pack_length();
+
+ field_mysql_type = field->type();
+ col_type = prebuilt->table->cols[i].mtype;
+
+ switch (col_type) {
+
+ case DATA_BLOB:
+ ptr = row_mysql_read_blob_ref(&len, ptr, len);
+
+ break;
+
+ case DATA_VARCHAR:
+ case DATA_BINARY:
+ case DATA_VARMYSQL:
+ if (field_mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* This is a >= 5.0.3 type true VARCHAR where
+ the real payload data length is stored in
+ 1 or 2 bytes */
+
+ ptr = row_mysql_read_true_varchar(
+ &len, ptr,
+ (ulint)
+ (((Field_varstring*)field)->length_bytes));
+
+ }
+
+ break;
+ default:
+ ;
+ }
+
+ if (field->null_ptr &&
+ field_in_record_is_null(table, field, (char*) row)) {
+ my_MD5Update (&ctx, &null_byte, 1);
+ } else {
+ my_MD5Update (&ctx, &true_byte, 1);
+ my_MD5Update (&ctx, ptr, len);
+ }
+ }
+ my_MD5Final (digest, &ctx);
+
+ return(0);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Updates a row given as a parameter to a new value. Note that we are given
whole rows, not just the fields which are updated: this incurs some
@@ -5718,6 +6342,20 @@ ha_innobase::update_row(
innobase_active_small();
+#ifdef WITH_WSREP
+ if (!error && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ DBUG_PRINT("wsrep", ("update row key"));
+
+ if (wsrep_append_keys(user_thd, false, old_row, new_row)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ error = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
DBUG_RETURN(error);
}
@@ -5761,6 +6399,18 @@ ha_innobase::delete_row(
innobase_active_small();
+#ifdef WITH_WSREP
+ if (!error && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("delete fail"));
+ error = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
DBUG_RETURN(error);
}
@@ -6550,7 +7200,393 @@ ha_innobase::rnd_pos(
DBUG_RETURN(error);
}
+#ifdef WITH_WSREP
+extern "C" {
+dict_index_t*
+wsrep_dict_foreign_find_index(
+ dict_table_t* table,
+ const char** columns,
+ ulint n_cols,
+ dict_index_t* types_idx,
+ ibool check_charsets,
+ ulint check_null);
+
+ulint
+wsrep_append_foreign_key(
+/*===========================*/
+ trx_t* trx, /*!< in: trx */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint */
+ const rec_t* rec, /*!<in: clustered index record */
+ dict_index_t* index, /*!<in: clustered index */
+ ibool referenced, /*!<in: is check for referenced table */
+ ibool shared) /*!<in: is shared access */
+{
+ ut_a(trx);
+ THD* thd = (THD*)trx->mysql_thd;
+ ulint rcode = DB_SUCCESS;
+ char cache_key[513] = {'\0'};
+ int cache_key_len;
+ bool const copy = true;
+
+ if (!wsrep_on(trx->mysql_thd) ||
+ wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ return DB_SUCCESS;
+
+ if (!thd || !foreign ||
+ (!foreign->referenced_table && !foreign->foreign_table))
+ {
+ WSREP_INFO("FK: %s missing in: %s",
+ (!thd) ? "thread" :
+ ((!foreign) ? "constraint" :
+ ((!foreign->referenced_table) ?
+ "referenced table" : "foreign table")),
+ (thd && wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_DEBUG("pulling %s table into cache",
+ (referenced) ? "referenced" : "foreign");
+ mutex_enter(&(dict_sys->mutex));
+ if (referenced)
+ {
+ foreign->referenced_table =
+ dict_table_get_low(
+ foreign->referenced_table_name_lookup, DICT_ERR_IGNORE_NONE);
+ if (foreign->referenced_table)
+ {
+ foreign->referenced_index =
+ wsrep_dict_foreign_find_index(
+ foreign->referenced_table,
+ foreign->referenced_col_names,
+ foreign->n_fields,
+ foreign->foreign_index,
+ TRUE, FALSE);
+ }
+ }
+ else
+ {
+ foreign->foreign_table =
+ dict_table_get_low(
+ foreign->foreign_table_name_lookup, DICT_ERR_IGNORE_NONE);
+ if (foreign->foreign_table)
+ {
+ foreign->foreign_index =
+ wsrep_dict_foreign_find_index(
+ foreign->foreign_table,
+ foreign->foreign_col_names,
+ foreign->n_fields,
+ foreign->referenced_index,
+ TRUE, FALSE);
+ }
+ }
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_WARN("FK: %s missing in query: %s",
+ (!foreign->referenced_table) ?
+ "referenced table" : "foreign table",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH;
+
+ dict_index_t *idx_target = (referenced) ?
+ foreign->referenced_index : index;
+ dict_index_t *idx = (referenced) ?
+ UT_LIST_GET_FIRST(foreign->referenced_table->indexes) :
+ UT_LIST_GET_FIRST(foreign->foreign_table->indexes);
+ int i = 0;
+ while (idx != NULL && idx != idx_target) {
+ if (innobase_strcasecmp (idx->name, innobase_index_reserve_name) != 0) {
+ i++;
+ }
+ idx = UT_LIST_GET_NEXT(indexes, idx);
+ }
+ ut_a(idx);
+ key[0] = (char)i;
+
+ rcode = wsrep_rec_get_foreign_key(
+ &key[1], &len, rec, index, idx,
+ wsrep_protocol_version > 1);
+ if (rcode != DB_SUCCESS) {
+ WSREP_ERROR(
+ "FK key set failed: %lu (%lu %lu), index: %s %s, %s",
+ rcode, referenced, shared,
+ (index && index->name) ? index->name :
+ "void index",
+ (index && index->table_name) ? index->table_name :
+ "void table",
+ wsrep_thd_query(thd));
+ return rcode;
+ }
+ strncpy(cache_key,
+ (wsrep_protocol_version > 1) ?
+ ((referenced) ?
+ foreign->referenced_table->name :
+ foreign->foreign_table->name) :
+ foreign->foreign_table->name, sizeof(cache_key) - 1);
+ cache_key_len = strlen(cache_key);
+#ifdef WSREP_DEBUG_PRINT
+ ulint j;
+ fprintf(stderr, "FK parent key, table: %s %s len: %lu ",
+ cache_key, (shared) ? "shared" : "exclusive", len+1);
+ for (j=0; j<len+1; j++) {
+ fprintf(stderr, " %hhX, ", key[j]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ char *p = strchr(cache_key, '/');
+ if (p) {
+ *p = '\0';
+ } else {
+ WSREP_WARN("unexpected foreign key table %s %s",
+ foreign->referenced_table->name,
+ foreign->foreign_table->name);
+ }
+
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)cache_key,
+ cache_key_len + 1,
+ (const uchar*)key, len+1,
+ wkey_part,
+ &wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for cascaded FK: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %lu", rcode));
+ WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ return DB_ERROR;
+ }
+
+ return DB_SUCCESS;
+}
+}
+
+static int
+wsrep_append_key(
+/*==================*/
+ THD *thd,
+ trx_t *trx,
+ TABLE_SHARE *table_share,
+ TABLE *table,
+ const char* key,
+ uint16_t key_len,
+ bool shared
+)
+{
+ DBUG_ENTER("wsrep_append_key");
+ bool const copy = true;
+#ifdef WSREP_DEBUG_PRINT
+ fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ",
+ (shared) ? "Shared" : "Exclusive",
+ wsrep_thd_thread_id(thd), trx->id, key_len,
+ table_share->table_name.str);
+ for (int i=0; i<key_len; i++) {
+ fprintf(stderr, "%hhX, ", key[i]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)table_share->table_cache_key.str,
+ table_share->table_cache_key.length,
+ (const uchar*)key, key_len,
+ wkey_part,
+ &wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+ }
+
+ int rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
+ WSREP_WARN("Appending row key failed: %s, %d",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ DBUG_RETURN(rcode);
+ }
+ DBUG_RETURN(0);
+}
+
+int
+ha_innobase::wsrep_append_keys(
+/*==================*/
+ THD *thd,
+ bool shared,
+ const uchar* record0, /* in: row in MySQL format */
+ const uchar* record1) /* in: row in MySQL format */
+{
+ DBUG_ENTER("wsrep_append_keys");
+
+ bool key_appended = false;
+ trx_t *trx = thd_to_trx(thd);
+
+ if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
+ WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
+ wsrep_thd_thread_id(thd),
+ table_share->tmp_table,
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(0);
+ }
+
+ if (wsrep_protocol_version == 0) {
+ uint len;
+ char keyval[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char *key = &keyval[0];
+ ibool is_null;
+
+ len = wsrep_store_key_val_for_row(
+ table, 0, key, WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record0, &is_null);
+
+ if (!is_null) {
+ int rcode = wsrep_append_key(
+ thd, trx, table_share, table, keyval,
+ len, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped (proto 0): %s",
+ wsrep_thd_query(thd));
+ }
+ } else {
+ ut_a(table->s->keys <= 256);
+ uint i;
+ bool hasPK= false;
+
+ for (i=0; i<table->s->keys; ++i) {
+ KEY* key_info = table->key_info + i;
+ if (key_info->flags & HA_NOSAME) {
+ hasPK = true;
+ if (i != table->s->primary_key) {
+ wsrep_thd_set_PA_safe(thd, FALSE);
+ }
+ }
+ }
+
+ for (i=0; i<table->s->keys; ++i) {
+ uint len;
+ char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char* key0 = &keyval0[1];
+ char* key1 = &keyval1[1];
+ KEY* key_info = table->key_info + i;
+ ibool is_null;
+
+ dict_index_t* idx = innobase_get_index(i);
+ dict_table_t* tab = (idx) ? idx->table : NULL;
+
+ keyval0[0] = (char)i;
+ keyval1[0] = (char)i;
+
+ if (!tab) {
+ WSREP_WARN("MySQL-InnoDB key mismatch %s %s",
+ table->s->table_name.str,
+ key_info->name);
+ }
+ /* !hasPK == table with no PK, must append all non-unique keys */
+ if (!hasPK || key_info->flags & HA_NOSAME ||
+ ((tab &&
+ dict_table_get_referenced_constraint(tab, idx)) ||
+ (!tab && referenced_by_foreign_key()))) {
+
+ len = wsrep_store_key_val_for_row(
+ table, i, key0,
+ WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record0, &is_null);
+ if (!is_null) {
+ int rcode = wsrep_append_key(
+ thd, trx, table_share, table,
+ keyval0, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+
+ if (key_info->flags & HA_NOSAME || shared)
+ key_appended = true;
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped: %s",
+ wsrep_thd_query(thd));
+ }
+ if (record1) {
+ len = wsrep_store_key_val_for_row(
+ table, i, key1,
+ WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record1, &is_null);
+ if (!is_null && memcmp(key0, key1, len)) {
+ int rcode = wsrep_append_key(
+ thd, trx, table_share,
+ table,
+ keyval1, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ }
+ }
+ }
+ }
+
+ /* if no PK, calculate hash of full row, to be the key value */
+ if (!key_appended && wsrep_certify_nonPK) {
+ uchar digest[16];
+ int rcode;
+
+ wsrep_calc_row_hash(digest, record0, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share, table,
+ (const char*) digest, 16,
+ shared))) {
+ DBUG_RETURN(rcode);
+ }
+
+ if (record1) {
+ wsrep_calc_row_hash(
+ digest, record1, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share,
+ table,
+ (const char*) digest,
+ 16, shared))) {
+ DBUG_RETURN(rcode);
+ }
+ }
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(0);
+}
+#endif
/*********************************************************************//**
Stores a reference to the current row to 'ref' field of the handle. Note
that in the case where we have generated the clustered index for the
@@ -9445,11 +10481,18 @@ ha_innobase::external_lock(
/* used by test case */
DBUG_EXECUTE_IF("no_innodb_binlog_errors", skip = 1;);
if (!skip) {
+#ifdef WITH_WSREP
+ if (!wsrep_on(thd) || wsrep_thd_exec_mode(thd) == LOCAL_STATE)
+ {
+#endif /* WITH_WSREP */
my_error(ER_BINLOG_STMT_MODE_AND_ROW_ENGINE, MYF(0),
" InnoDB is limited to row-logging when "
"transaction isolation level is "
"READ COMMITTED or READ UNCOMMITTED.");
DBUG_RETURN(HA_ERR_LOGGING_IMPOSSIBLE);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
}
@@ -10707,6 +11750,9 @@ innobase_xa_prepare(
to the session variable take effect only in the next transaction */
if (!trx->support_xa) {
+#ifdef WITH_WSREP
+ thd_get_xid(thd, (MYSQL_XID*) &trx->xid);
+#endif // WITH_WSREP
return(0);
}
@@ -11565,6 +12611,304 @@ static SHOW_VAR innodb_status_variables_export[]= {
static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+#ifdef WITH_WSREP
+void
+wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ (long long)bf_seqno, (long long)victim_seqno);
+ abort();
+}
+
+int
+wsrep_innobase_kill_one_trx(
+ void *bf_thd_ptr, /*!< in: BF thd */
+ trx_t *bf_trx, /*!< in: BF trx */
+ trx_t *victim_trx, /*!< in: victim trx */
+ ibool signal, /*!< in: signal to be used */
+ ibool have_kernel_mutex) /*!<in: do we own kernel mutex */
+{
+ DBUG_ENTER("wsrep_innobase_kill_one_trx");
+ THD *bf_thd = (THD *)bf_thd_ptr;
+ THD *thd = (THD *) victim_trx->mysql_thd;
+ int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
+
+ if (have_kernel_mutex) {
+ ut_ad(mutex_own(&kernel_mutex));
+ }
+
+ if (!bf_thd) bf_thd = (bf_trx) ? (THD *)bf_trx->mysql_thd : NULL;
+
+ if (!thd) {
+ DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
+ WSREP_WARN("no THD for trx: %llu", victim_trx->id);
+ DBUG_RETURN(1);
+ }
+ if (!bf_thd) {
+ DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
+ WSREP_WARN("no BF THD for trx: %llu", (bf_trx) ? bf_trx->id : 0);
+ DBUG_RETURN(1);
+ }
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: %llu",
+ signal, (long long)bf_seqno,
+ wsrep_thd_thread_id(thd),
+ victim_trx->id);
+
+ WSREP_DEBUG("Aborting query: %s",
+ (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void");
+
+ wsrep_thd_LOCK(thd);
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
+ WSREP_DEBUG("kill trx EXITING for %llu", victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ }
+ if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
+ WSREP_DEBUG("withdraw for BF trx: %llu, state: %d",
+ victim_trx->id,
+ wsrep_thd_conflict_state(thd));
+ }
+
+ switch (wsrep_thd_conflict_state(thd)) {
+ case NO_CONFLICT:
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("victim %llu in MUST ABORT state",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(bf_thd, thd, signal);
+ DBUG_RETURN(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("victim %llu in state %d",
+ victim_trx->id, wsrep_thd_conflict_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd)) {
+ case QUERY_COMMITTING:
+ enum wsrep_status rcode;
+
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_awake(bf_thd, thd, signal);
+ WSREP_DEBUG("kill trx QUERY_COMMITTING for %llu",
+ victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ } else {
+ rcode = wsrep->abort_pre_commit(
+ wsrep, bf_seqno,
+ (wsrep_trx_id_t)victim_trx->id
+ );
+
+ switch (rcode) {
+ case WSREP_WARNING:
+ WSREP_DEBUG("cancel commit warning: %llu",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(1);
+ break;
+ case WSREP_OK:
+ break;
+ default:
+ WSREP_ERROR(
+ "cancel commit bad exit: %d %llu",
+ rcode,
+ victim_trx->id);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first.
+ */
+ abort();
+ break;
+ }
+ }
+ break;
+ case QUERY_EXEC:
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting
+ */
+ WSREP_DEBUG("kill trx QUERY_EXEC for %llu", victim_trx->id);
+
+ victim_trx->was_chosen_as_deadlock_victim= TRUE;
+ if (victim_trx->wait_lock) {
+ WSREP_DEBUG("victim has wait flag: %ld",
+ wsrep_thd_thread_id(thd));
+ lock_t* wait_lock = victim_trx->wait_lock;
+ if (wait_lock) {
+ WSREP_DEBUG("canceling wait lock");
+ victim_trx->was_chosen_as_deadlock_victim= TRUE;
+ if (!have_kernel_mutex) {
+ mutex_enter(&kernel_mutex);
+ }
+ lock_cancel_waiting_and_release(wait_lock);
+ /* If we already have kernel mutex when we
+ arrived to this function, do not yet release
+ it */
+ if (!have_kernel_mutex) {
+ mutex_exit(&kernel_mutex);
+ }
+ }
+
+ wsrep_thd_awake(bf_thd, thd, signal);
+ } else {
+ /* abort currently executing query */
+ DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_awake(bf_thd, thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ }
+ }
+ break;
+ case QUERY_IDLE:
+ {
+ bool skip_abort= false;
+ wsrep_aborting_thd_t abortees;
+
+ WSREP_DEBUG("kill IDLE for %llu", victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ WSREP_DEBUG("kill BF IDLE, seqno: %lld",
+ (long long)wsrep_thd_trx_seqno(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ DBUG_RETURN(0);
+ }
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+
+ abortees = wsrep_aborting_thd;
+ while (abortees && !skip_abort) {
+ /* check if we have a kill message for this already */
+ if (abortees->aborting_thd == thd) {
+ skip_abort = true;
+ WSREP_WARN("duplicate thd aborter %lu",
+ wsrep_thd_thread_id(thd));
+ }
+ abortees = abortees->next;
+ }
+ if (!skip_abort) {
+ wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
+ my_malloc(sizeof(struct wsrep_aborting_thd),
+ MYF(0));
+ aborting->aborting_thd = thd;
+ aborting->next = wsrep_aborting_thd;
+ wsrep_aborting_thd = aborting;
+ DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("enqueuing trx abort for (%lu)",
+ wsrep_thd_thread_id(thd));
+ }
+
+ DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
+ WSREP_DEBUG("signaling aborter");
+ mysql_cond_signal(&COND_wsrep_rollback);
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+
+ break;
+ }
+ default:
+ WSREP_WARN("bad wsrep query state: %d",
+ wsrep_thd_query_state(thd));
+ break;
+ }
+ wsrep_thd_UNLOCK(thd);
+
+ DBUG_RETURN(0);
+}
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal)
+{
+ DBUG_ENTER("wsrep_innobase_abort_thd");
+ trx_t* victim_trx = thd_to_trx(victim_thd);
+ trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
+
+ ut_ad(!mutex_own(&kernel_mutex));
+
+ WSREP_DEBUG("abort transaction: BF: %s victim: %s",
+ wsrep_thd_query(bf_thd),
+ wsrep_thd_query(victim_thd));
+
+ if (victim_trx)
+ {
+ int rcode = wsrep_innobase_kill_one_trx(
+ bf_thd, bf_trx, victim_trx, signal, FALSE);
+ wsrep_srv_conc_cancel_wait(victim_trx);
+ DBUG_RETURN(rcode);
+ } else {
+ WSREP_DEBUG("victim does not have transaction");
+ wsrep_thd_LOCK(victim_thd);
+ wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
+ wsrep_thd_UNLOCK(victim_thd);
+ wsrep_thd_awake(bf_thd, victim_thd, signal);
+ }
+ DBUG_RETURN(-1);
+}
+
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ if (wsrep_is_wsrep_xid(xid)) {
+ mtr_t mtr;
+ mtr_start(&mtr);
+ trx_sysf_t* sys_header = trx_sysf_get(&mtr);
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ innobase_flush_logs(hton);
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ trx_sys_read_wsrep_checkpoint(xid);
+ return 0;
+}
+
+static void
+wsrep_fake_trx_id(
+/*==================*/
+ handlerton *hton,
+ THD *thd) /*!< in: user thread handle */
+{
+ mutex_enter(&kernel_mutex);
+ trx_id_t trx_id = trx_sys_get_new_trx_id();
+ mutex_exit(&kernel_mutex);
+
+ (void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
+}
+
+#endif /* WITH_WSREP */
/* plugin options */
static MYSQL_SYSVAR_BOOL(checksums, innobase_use_checksums,
PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY,
@@ -11917,6 +13261,40 @@ static MYSQL_SYSVAR_UINT(change_buffering_debug, ibuf_debug,
NULL, NULL, 0, 0, 2, 0);
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/*******************************************************
+ * innobase_disallow_writes variable definition *
+ *******************************************************/
+
+/* Must always init to FALSE. */
+static my_bool innobase_disallow_writes = FALSE;
+
+/**************************************************************************
+An "update" method for innobase_disallow_writes variable. */
+static
+void
+innobase_disallow_writes_update(
+/*============================*/
+ THD* thd, /* in: thread handle */
+ st_mysql_sys_var* var, /* in: pointer to system
+ variable */
+ void* var_ptr, /* out: pointer to dynamic
+ variable */
+ const void* save) /* in: temporary storage */
+{
+ *(my_bool*)var_ptr = *(my_bool*)save;
+ ut_a(srv_allow_writes_event);
+ if (*(my_bool*)var_ptr)
+ os_event_reset(srv_allow_writes_event);
+ else
+ os_event_set(srv_allow_writes_event);
+}
+
+static MYSQL_SYSVAR_BOOL(disallow_writes, innobase_disallow_writes,
+ PLUGIN_VAR_NOCMDOPT,
+ "Tell InnoDB to stop any writes to disk",
+ NULL, innobase_disallow_writes_update, FALSE);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
static MYSQL_SYSVAR_BOOL(random_read_ahead, srv_random_read_ahead,
PLUGIN_VAR_NOCMDARG,
"Whether to use read ahead for random access within an extent.",
@@ -12020,6 +13398,9 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
MYSQL_SYSVAR(change_buffering_debug),
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ MYSQL_SYSVAR(disallow_writes),
+#endif /* WITH_INNODB_DISALLOW_WRITES */
MYSQL_SYSVAR(random_read_ahead),
MYSQL_SYSVAR(read_ahead_threshold),
MYSQL_SYSVAR(io_capacity),
diff --git a/storage/innobase/handler/ha_innodb.h b/storage/innobase/handler/ha_innodb.h
index 42aae4dc20e..8034f4dd535 100644
--- a/storage/innobase/handler/ha_innodb.h
+++ b/storage/innobase/handler/ha_innodb.h
@@ -114,6 +114,10 @@ class ha_innobase: public handler
dict_index_t* innobase_get_index(uint keynr);
int info_low(uint flag, bool called_from_analyze);
+#ifdef WITH_WSREP
+ int wsrep_append_keys(THD *thd, bool shared,
+ const uchar* record0, const uchar* record1);
+#endif
/* Init values for the class: */
public:
ha_innobase(handlerton *hton, TABLE_SHARE *table_arg);
@@ -293,6 +297,40 @@ bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd);
*/
extern void mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file);
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+//extern "C" int wsrep_trx_order_before(void *thd1, void *thd2);
+
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd);
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd);
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd);
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
+extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
+extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
+extern "C" const char * wsrep_thd_query_state_str(THD *thd);
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state);
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state);
+
+extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id);
+
+extern "C"void wsrep_thd_LOCK(THD *thd);
+extern "C"void wsrep_thd_UNLOCK(THD *thd);
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
+extern "C" time_t wsrep_thd_query_start(THD *thd);
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
+extern "C" int64_t wsrep_thd_trx_seqno(THD *thd);
+extern "C" query_id_t wsrep_thd_query_id(THD *thd);
+extern "C" char * wsrep_thd_query(THD *thd);
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
+extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);
+#endif
typedef struct trx_struct trx_t;
/********************************************************************//**
@file handler/ha_innodb.h
@@ -333,3 +371,6 @@ innobase_index_name_is_reserved(
ulint num_of_keys); /*!< in: Number of indexes to
be created. */
+#ifdef WITH_WSREP
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc
index 735d9fb95d0..9afa840f8d8 100644
--- a/storage/innobase/handler/handler0alter.cc
+++ b/storage/innobase/handler/handler0alter.cc
@@ -36,6 +36,10 @@ extern "C" {
#include "handler0alter.h"
}
+#ifdef WITH_WSREP
+//#include "wsrep_api.h"
+#include <sql_acl.h> // PROCESS_ACL
+#endif
#include "ha_innodb.h"
/*************************************************************//**
diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h
index a58bb914be2..daa3c35ebfb 100644
--- a/storage/innobase/include/dict0mem.h
+++ b/storage/innobase/include/dict0mem.h
@@ -344,6 +344,9 @@ barracuda format, the length could be REC_VERSION_56_MAX_INDEX_COL_LEN
/** Defines the maximum fixed length column size */
#define DICT_MAX_FIXED_COL_LEN DICT_ANTELOPE_MAX_INDEX_COL_LEN
+#ifdef WITH_WSREP
+#define WSREP_MAX_SUPPORTED_KEY_LENGTH 3500
+#endif /* WITH_WSREP */
/** Data structure for a field in an index */
struct dict_field_struct{
diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h
index 3859b45e8d1..e72da36b2e0 100644
--- a/storage/innobase/include/ha_prototypes.h
+++ b/storage/innobase/include/ha_prototypes.h
@@ -285,6 +285,19 @@ thd_set_lock_wait_time(
void* thd, /*!< in: thread handle (THD*) */
ulint value); /*!< in: time waited for the lock */
+#ifdef WITH_WSREP
+UNIV_INTERN int wsrep_innobase_kill_one_trx(void *thd, trx_t *bf_trx, trx_t *victim_trx,
+ ibool signal, ibool have_kernel_mutex);
+my_bool wsrep_thd_is_BF(void *thd_ptr, my_bool sync);
+//int64_t wsrep_thd_trx_seqno(THD *thd);
+int wsrep_trx_order_before(void *thd1, void *thd2);
+int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
+ unsigned char* str, unsigned int str_length,
+ unsigned int buf_length);
+int wsrep_on(void *thd_ptr);
+int wsrep_is_wsrep_xid(const void*);
+my_bool wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe);
+#endif /* WITH_WSREP */
/**********************************************************************//**
Get the current setting of the lower_case_table_names global parameter from
mysqld.cc. We do a dirty read because for one there is no synchronization
diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h
index 01439835aa5..237e43b7882 100644
--- a/storage/innobase/include/lock0lock.h
+++ b/storage/innobase/include/lock0lock.h
@@ -799,9 +799,6 @@ lock_rec_get_page_no(
record */
#define LOCK_CONV_BY_OTHER 4096 /*!< this bit is set when the lock is created
by other transaction */
-#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_MODE_MASK
-# error
-#endif
#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_TYPE_MASK
# error
#endif
diff --git a/storage/innobase/include/rem0rec.h b/storage/innobase/include/rem0rec.h
index 9dd96f609ea..7e76b4f40cb 100644
--- a/storage/innobase/include/rem0rec.h
+++ b/storage/innobase/include/rem0rec.h
@@ -836,6 +836,15 @@ are given in one byte (resp. two byte) format. */
two upmost bits in a two byte offset for special purposes */
#define REC_MAX_DATA_SIZE (16 * 1024)
+#ifdef WITH_WSREP
+int wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index for foreign table */
+ dict_index_t* index_ref, /* in: index for referenced table */
+ ibool new_protocol); /* in: protocol > 1 */
+#endif /* WITH_WSREP */
#ifndef UNIV_NONINL
#include "rem0rec.ic"
#endif
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index e48aca84317..c95799f4478 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -142,6 +142,10 @@ extern ulint srv_log_buffer_size;
extern ulong srv_flush_log_at_trx_commit;
extern char srv_adaptive_flushing;
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/* When this event is reset we do not allow any file writes to take place. */
+extern os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
/* If this flag is TRUE, then we will load the indexes' (and tables') metadata
even if they are marked as "corrupted". Mostly it is for DBA to process
corrupted index and table */
@@ -578,6 +582,14 @@ srv_conc_enter_innodb(
/*==================*/
trx_t* trx); /*!< in: transaction object associated with the
thread */
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx); /*!< in: transaction object associated with the
+ thread */
+#endif /* WITH_WSREP */
/*********************************************************************//**
This lets a thread enter InnoDB regardless of the number of threads inside
InnoDB. This must be called when a thread ends a lock wait. */
diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h
index 69b87e18010..b8f1973d9c5 100644
--- a/storage/innobase/include/trx0sys.h
+++ b/storage/innobase/include/trx0sys.h
@@ -41,6 +41,9 @@ Created 3/26/1996 Heikki Tuuri
#include "ut0bh.h"
#include "read0types.h"
#include "page0types.h"
+#ifdef WITH_WSREP
+#include "trx0xa.h"
+#endif /* WITH_WSREP */
/** In a MySQL replication slave, in crash recovery we store the master log
file name and position here. */
@@ -306,6 +309,9 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr); /*!< in: mtr */
/*****************************************************************//**
Prints to stderr the MySQL binlog offset info in the trx system header if
@@ -314,6 +320,19 @@ UNIV_INTERN
void
trx_sys_print_mysql_binlog_offset(void);
/*===================================*/
+#ifdef WITH_WSREP
+/** Update WSREP checkpoint XID in sys header. */
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: WSREP XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr); /*!< in: mtr */
+
+void
+/** Read WSREP checkpoint XID from sys header. */
+trx_sys_read_wsrep_checkpoint(
+ XID* xid); /*!< out: WSREP XID */
+#endif /* WITH_WSREP */
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header if
the magic number shows it valid. */
@@ -519,6 +538,22 @@ this contains the same fields as TRX_SYS_MYSQL_LOG_INFO below */
within that file */
#define TRX_SYS_MYSQL_LOG_NAME 12 /*!< MySQL log file name */
+#ifdef WITH_WSREP
+/* We hijack TRX_SYS_MYSQL_MASTER_LOG_INFO, it seems to be completely unused
+ otherwise (see comments for MySQL bug #34058). */
+/** */
+#define TRX_SYS_WSREP_XID_INFO TRX_SYS_MYSQL_MASTER_LOG_INFO
+#define TRX_SYS_WSREP_XID_MAGIC_N_FLD 0
+#define TRX_SYS_WSREP_XID_MAGIC_N 0x77737265
+
+/* XID field: formatID, gtrid_len, bqual_len, xid_data */
+#define TRX_SYS_WSREP_XID_LEN (4 + 4 + 4 + XIDDATASIZE)
+#define TRX_SYS_WSREP_XID_FORMAT 4
+#define TRX_SYS_WSREP_XID_GTRID_LEN 8
+#define TRX_SYS_WSREP_XID_BQUAL_LEN 12
+#define TRX_SYS_WSREP_XID_DATA 16
+#endif /* WITH_WSREP*/
+
/** Doublewrite buffer */
/* @{ */
/** The offset of the doublewrite buffer header on the trx system header page */
diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h
index 4ade245f03e..0d23ede46b6 100644
--- a/storage/innobase/include/trx0trx.h
+++ b/storage/innobase/include/trx0trx.h
@@ -729,6 +729,9 @@ struct trx_struct{
/*------------------------------*/
char detailed_error[256]; /*!< detailed error message for last
error, or empty. */
+#ifdef WITH_WSREP
+ os_event_t wsrep_event; /* event waited for in srv_conc_slot */
+#endif /* WITH_WSREP */
};
#define TRX_MAX_N_THREADS 32 /* maximum number of
diff --git a/storage/innobase/lock/lock0lock.c b/storage/innobase/lock/lock0lock.c
index e6ce07428e8..98cb239befc 100644
--- a/storage/innobase/lock/lock0lock.c
+++ b/storage/innobase/lock/lock0lock.c
@@ -40,6 +40,10 @@ Created 5/7/1996 Heikki Tuuri
#include "trx0sys.h"
#include "btr0btr.h"
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+extern my_bool wsrep_log_conflicts;
+#endif
/* Restricts the length of search we will do in the waits-for
graph of transactions */
#define LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK 1000000
@@ -905,6 +909,9 @@ UNIV_INLINE
ibool
lock_rec_has_to_wait(
/*=================*/
+#ifdef WITH_WSREP
+ ibool for_locking, /*!< is caller locking or releasing */
+#endif /* WITH_WSREP */
const trx_t* trx, /*!< in: trx of new lock */
ulint type_mode,/*!< in: precise mode of the new lock
to set: LOCK_S or LOCK_X, possibly
@@ -974,6 +981,44 @@ lock_rec_has_to_wait(
return(FALSE);
}
+#ifdef WITH_WSREP
+ /* if BF thread is locking and has conflict with another BF
+ thread, we need to look at trx ordering and lock types */
+ if (for_locking &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE) &&
+ wsrep_thd_is_BF(lock2->trx->mysql_thd, TRUE)) {
+
+ if (wsrep_debug) {
+ fprintf(stderr, "\n BF-BF lock conflict \n");
+ lock_rec_print(stderr, lock2);
+ }
+
+ if (wsrep_trx_order_before(trx->mysql_thd,
+ lock2->trx->mysql_thd) &&
+ (type_mode & LOCK_MODE_MASK) == LOCK_X &&
+ (lock2->type_mode & LOCK_MODE_MASK) == LOCK_X)
+ {
+ /* exclusive lock conflicts are not accepted */
+ fprintf(stderr, "BF-BF X lock conflict\n");
+ lock_rec_print(stderr, lock2);
+ abort();
+ } else {
+ /* if lock2->index->n_uniq <=
+ lock2->index->n_user_defined_cols
+ operation is on uniq index
+ */
+ if (wsrep_debug) fprintf(stderr,
+ "BF conflict, modes: %lu %lu, "
+ "idx: %s-%s n_uniq %u n_user %u\n",
+ type_mode, lock2->type_mode,
+ lock2->index->name,
+ lock2->index->table_name,
+ lock2->index->n_uniq,
+ lock2->index->n_user_defined_cols);
+ return FALSE;
+ }
+ }
+#endif /* WITH_WSREP */
return(TRUE);
}
@@ -1004,7 +1049,11 @@ lock_has_to_wait(
/* If this lock request is for a supremum record
then the second bit on the lock bitmap is set */
+#ifdef WITH_WSREP
+ return(lock_rec_has_to_wait(FALSE, lock1->trx,
+#else
return(lock_rec_has_to_wait(lock1->trx,
+#endif /* WITH_WSREP */
lock1->type_mode, lock2,
lock_rec_get_nth_bit(
lock1, 1)));
@@ -1454,6 +1503,11 @@ lock_rec_has_expl(
return(NULL);
}
+#ifdef WITH_WSREP
+static
+void
+lock_rec_discard(lock_t* in_lock);
+#endif
#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
@@ -1503,6 +1557,53 @@ lock_rec_other_has_expl_req(
}
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+static void
+wsrep_kill_victim(trx_t *trx, lock_t *lock) {
+ my_bool bf_this = wsrep_thd_is_BF(trx->mysql_thd, FALSE);
+ my_bool bf_other = wsrep_thd_is_BF(lock->trx->mysql_thd, TRUE);
+ if ((bf_this && !bf_other) ||
+ (bf_this && bf_other && wsrep_trx_order_before(
+ trx->mysql_thd, lock->trx->mysql_thd))) {
+
+ if (lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: BF victim waiting\n");
+ /* cannot release lock, until our lock
+ is in the queue*/
+ } else if (lock->trx != trx) {
+ if (wsrep_log_conflicts) {
+ if (bf_this)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print(stderr, trx, 3000);
+
+ if (bf_other)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print(stderr, lock->trx, 3000);
+
+ fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
+ stderr);
+
+ if (lock_get_type(lock) == LOCK_REC) {
+ lock_rec_print(stderr, lock);
+ } else {
+ lock_table_print(stderr, lock);
+ }
+ }
+ wsrep_innobase_kill_one_trx(
+ trx->mysql_thd, trx, lock->trx, TRUE, TRUE);
+ }
+ }
+}
+#endif
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
@@ -1530,8 +1631,15 @@ lock_rec_other_has_conflicting(
if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
do {
- if (lock_rec_has_to_wait(trx, mode, lock,
+#ifdef WITH_WSREP
+ if (lock_rec_has_to_wait(TRUE, trx, mode, lock,
+#else
+ if (lock_rec_has_to_wait(trx, mode, lock,
+#endif /* WITH_WSREP */
TRUE)) {
+#ifdef WITH_WSREP
+ wsrep_kill_victim(trx, lock);
+#endif
return(lock);
}
@@ -1540,8 +1648,15 @@ lock_rec_other_has_conflicting(
} else {
do {
- if (lock_rec_has_to_wait(trx, mode, lock,
+#ifdef WITH_WSREP
+ if (lock_rec_has_to_wait(TRUE, trx, mode, lock,
+#else
+ if (lock_rec_has_to_wait(trx, mode, lock,
+#endif /* WITH_WSREP */
FALSE)) {
+#ifdef WITH_WSREP
+ wsrep_kill_victim(trx, lock);
+#endif
return(lock);
}
@@ -1673,6 +1788,9 @@ static
lock_t*
lock_rec_create(
/*============*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint type_mode,/*!< in: lock mode and wait
flag, type is ignored and
replaced by LOCK_REC */
@@ -1732,8 +1850,66 @@ lock_rec_create(
/* Set the bit corresponding to rec */
lock_rec_set_nth_bit(lock, heap_no);
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ lock_t *hash = c_lock->hash;
+ lock_t *prev = NULL;
+
+ while (hash &&
+ wsrep_thd_is_BF(hash->trx->mysql_thd, TRUE) &&
+ wsrep_trx_order_before(
+ hash->trx->mysql_thd, trx->mysql_thd))
+ {
+ prev = hash;
+ hash = hash->hash;
+ }
+ lock->hash = hash;
+ if (prev) {
+ prev->hash = lock;
+ } else {
+ c_lock->hash = lock;
+ }
+ /*
+ * delayed conflict resolution '...kill_one_trx' was not called,
+ * if victim was waiting for some other lock
+ */
+ if (c_lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
+ c_lock->trx->was_chosen_as_deadlock_victim = TRUE;
+
+ if (wsrep_debug && c_lock->trx->wait_lock != c_lock) {
+ fprintf(stderr, "WSREP: c_lock != wait lock\n");
+ lock_rec_print(stderr, c_lock);
+ lock_rec_print(stderr, c_lock->trx->wait_lock);
+ }
+
+ trx->que_state = TRX_QUE_LOCK_WAIT;
+ lock_set_lock_and_trx_wait(lock, trx);
+
+ lock_cancel_waiting_and_release(c_lock->trx->wait_lock);
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+
+ if (wsrep_debug) fprintf(
+ stderr,
+ "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+
+ /* have to bail out here to avoid lock_set_lock... */
+ return(lock);
+ }
+ } else {
+ HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
+ lock_rec_fold(space, page_no), lock);
+ }
+#else
HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock);
+#endif /* WITH_WSREP */
if (lock_is_wait_not_by_other(type_mode)) {
lock_set_lock_and_trx_wait(lock, trx);
@@ -1753,6 +1929,9 @@ static
enum db_err
lock_rec_enqueue_waiting(
/*=====================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint type_mode,/*!< in: lock mode this
transaction is requesting:
LOCK_S or LOCK_X, possibly
@@ -1805,9 +1984,18 @@ lock_rec_enqueue_waiting(
}
if (lock == NULL) {
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) && trx->was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ /* Enqueue the lock request that will wait to be granted */
+ lock = lock_rec_create(c_lock, type_mode | LOCK_WAIT,
+ block, heap_no, index, trx);
+#else
/* Enqueue the lock request that will wait to be granted */
lock = lock_rec_create(type_mode | LOCK_WAIT,
block, heap_no, index, trx);
+#endif /*WITH_WSREP */
} else {
ut_ad(lock->type_mode & LOCK_WAIT);
ut_ad(lock->type_mode & LOCK_CONV_BY_OTHER);
@@ -1892,7 +2080,19 @@ lock_rec_add_to_queue(
lock_t* other_lock
= lock_rec_other_has_expl_req(mode, 0, LOCK_WAIT,
block, heap_no, trx);
+#ifdef WITH_WSREP
+ /* this can potentionally assert with wsrep */
+ if (wsrep_on(trx->mysql_thd)) {
+ if (wsrep_debug && other_lock) {
+ fprintf(stderr,
+ "WSREP: InnoDB assert ignored\n");
+ }
+ } else {
+ ut_a(!other_lock);
+ }
+#else
ut_a(!other_lock);
+#endif /* WITH_WSREP */
}
#endif /* UNIV_DEBUG */
@@ -1945,7 +2145,11 @@ lock_rec_add_to_queue(
}
somebody_waits:
+#ifdef WITH_WSREP
+ return(lock_rec_create(NULL, type_mode, block, heap_no, index, trx));
+#else
return(lock_rec_create(type_mode, block, heap_no, index, trx));
+#endif
}
/** Record locking request status */
@@ -2005,7 +2209,11 @@ lock_rec_lock_fast(
if (lock == NULL) {
if (!impl) {
+#ifdef WITH_WSREP
+ lock_rec_create(NULL, mode, block, heap_no, index, trx);
+#else
lock_rec_create(mode, block, heap_no, index, trx);
+#endif
}
return(LOCK_REC_SUCCESS_CREATED);
@@ -2061,6 +2269,9 @@ lock_rec_lock_slow(
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
+#ifdef WITH_WSREP
+ lock_t* c_lock = NULL;
+#endif
lock_t* lock;
ut_ad(mutex_own(&kernel_mutex));
@@ -2102,7 +2313,12 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
+#ifdef WITH_WSREP
+ } else if ((c_lock = lock_rec_other_has_conflicting(
+ mode, block, heap_no, trx))) {
+#else
} else if (lock_rec_other_has_conflicting(mode, block, heap_no, trx)) {
+#endif /* WITH_WSREP */
/* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong
@@ -2110,8 +2326,16 @@ lock_rec_lock_slow(
ut_ad(lock == NULL);
enqueue_waiting:
+#ifdef WITH_WSREP
+ /* c_lock is NULL here if jump to enqueue_waiting happened
+ but it's ok because lock is not NULL in that case and c_lock
+ is not used. */
+ return(lock_rec_enqueue_waiting(c_lock, mode, block, heap_no,
+ lock, index, thr));
+#else
return(lock_rec_enqueue_waiting(mode, block, heap_no,
lock, index, thr));
+#endif /* WITH_WSREP */
} else if (!impl) {
/* Set the requested lock on the record */
@@ -2158,7 +2382,6 @@ lock_rec_lock(
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0);
-
/* We try a simplified and faster subroutine for the most
common cases */
switch (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
@@ -3590,6 +3813,26 @@ lock_deadlock_recursive(
stderr);
}
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+ if (wsrep_debug)
+ fputs("WSREP: Deadlock detected\n", stderr);
+ if ((start != wait_lock->trx) &&
+ wsrep_thd_is_BF(start->mysql_thd, TRUE) &&
+ wsrep_thd_is_BF(
+ wait_lock->trx->mysql_thd, TRUE))
+ {
+ if (wsrep_trx_order_before(
+ start->mysql_thd,
+ wait_lock->trx->mysql_thd)) {
+
+ wait_lock->trx->was_chosen_as_deadlock_victim = TRUE;
+ lock_cancel_waiting_and_release(wait_lock);
+ return(LOCK_VICTIM_IS_OTHER);
+ } else {
+ return(LOCK_VICTIM_IS_START);
+ }
+ }
+#endif
if (trx_weight_ge(wait_lock->trx, start)) {
/* Our recursion starting point
@@ -3597,8 +3840,23 @@ lock_deadlock_recursive(
choose 'start' as the victim and roll
back it */
+#ifdef WITH_WSREP
+ if (!wsrep_thd_is_BF(
+ start->mysql_thd, FALSE))
+ {
+ return(LOCK_VICTIM_IS_START);
+ }
+#else
+ return(LOCK_VICTIM_IS_START);
+#endif
+ }
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(
+ wait_lock->trx->mysql_thd, TRUE))
+ {
return(LOCK_VICTIM_IS_START);
}
+#endif
lock_deadlock_found = TRUE;
@@ -3683,6 +3941,9 @@ UNIV_INLINE
lock_t*
lock_table_create(
/*==============*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
dict_table_t* table, /*!< in: database table in dictionary cache */
ulint type_mode,/*!< in: lock mode possibly ORed with
LOCK_WAIT */
@@ -3719,7 +3980,40 @@ lock_table_create(
lock->un_member.tab_lock.table = table;
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ UT_LIST_INSERT_AFTER(
+ un_member.tab_lock.locks, table->locks, c_lock, lock);
+ } else {
+ UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+ }
+
+ if (c_lock && c_lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug) {
+ fprintf(stderr,
+ "WSREP: table c_lock in wait: %llu new loc: %llu\n",
+ (ulonglong) c_lock->trx->id, lock->trx->id);
+ }
+
+ c_lock->trx->was_chosen_as_deadlock_victim = TRUE;
+ lock_cancel_waiting_and_release(c_lock->trx->wait_lock);
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+
+ if (wsrep_debug) {
+ fprintf(stderr, "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+ }
+ }
+
+#else
UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+#endif
if (UNIV_UNLIKELY(type_mode & LOCK_WAIT)) {
@@ -3865,6 +4159,9 @@ static
ulint
lock_table_enqueue_waiting(
/*=======================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint mode, /*!< in: lock mode this transaction is
requesting */
dict_table_t* table, /*!< in: table */
@@ -3906,7 +4203,14 @@ lock_table_enqueue_waiting(
/* Enqueue the lock request that will wait to be granted */
+#ifdef WITH_WSREP
+ if (trx->was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ lock = lock_table_create(c_lock, table, mode | LOCK_WAIT, trx);
+#else
lock = lock_table_create(table, mode | LOCK_WAIT, trx);
+#endif
/* Check if a deadlock occurs: if yes, remove the lock request and
return an error code */
@@ -3964,7 +4268,11 @@ lock_table_other_has_incompatible(
if ((lock->trx != trx)
&& (!lock_mode_compatible(lock_get_mode(lock), mode))
&& (wait || !(lock_get_wait(lock)))) {
-
+#ifdef WITH_WSREP
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: table lock abort");
+ wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
+#endif
return(lock);
}
@@ -3988,6 +4296,9 @@ lock_table(
enum lock_mode mode, /*!< in: lock mode */
que_thr_t* thr) /*!< in: query thread */
{
+#ifdef WITH_WSREP
+ lock_t *c_lock;
+#endif
trx_t* trx;
ulint err;
@@ -4016,19 +4327,32 @@ lock_table(
/* We have to check if the new lock is compatible with any locks
other transactions have in the table lock queue. */
+#ifdef WITH_WSREP
+ if ((c_lock = (lock_t *)lock_table_other_has_incompatible(
+ trx, LOCK_WAIT, table, mode))) {
+#else
if (lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode)) {
+#endif
/* Another trx has a request on the table in an incompatible
mode: this trx may have to wait */
+#ifdef WITH_WSREP
+ err = lock_table_enqueue_waiting(c_lock, mode | flags, table, thr);
+#else
err = lock_table_enqueue_waiting(mode | flags, table, thr);
+#endif
lock_mutex_exit_kernel();
return(err);
}
+#ifdef WITH_WSREP
+ lock_table_create(c_lock, table, mode | flags, trx);
+#else
lock_table_create(table, mode | flags, trx);
+#endif
ut_a(!flags || mode == LOCK_S || mode == LOCK_X);
@@ -4946,6 +5270,7 @@ lock_rec_queue_validate(
if (!lock_rec_get_gap(lock) && !lock_get_wait(lock)) {
+#ifndef WITH_WSREP
enum lock_mode mode;
if (lock_get_mode(lock) == LOCK_S) {
@@ -4955,6 +5280,7 @@ lock_rec_queue_validate(
}
ut_a(!lock_rec_other_has_expl_req(
mode, 0, 0, block, heap_no, lock->trx));
+#endif /* WITH_WSREP */
} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
@@ -5231,6 +5557,9 @@ lock_rec_insert_check_and_lock(
lock_t* lock;
ulint err;
ulint next_rec_heap_no;
+#ifdef WITH_WSREP
+ lock_t *c_lock;
+#endif
ut_ad(block->frame == page_align(rec));
@@ -5283,15 +5612,28 @@ lock_rec_insert_check_and_lock(
had to wait for their insert. Both had waiting gap type lock requests
on the successor, which produced an unnecessary deadlock. */
+#ifdef WITH_WSREP
+ if ((c_lock = lock_rec_other_has_conflicting(
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
+ block, next_rec_heap_no, trx))) {
+#else
if (lock_rec_other_has_conflicting(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
block, next_rec_heap_no, trx)) {
+#endif
/* Note that we may get DB_SUCCESS also here! */
+#ifdef WITH_WSREP
+ err = lock_rec_enqueue_waiting(c_lock, LOCK_X | LOCK_GAP
+ | LOCK_INSERT_INTENTION,
+ block, next_rec_heap_no,
+ NULL, index, thr);
+#else
err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP
| LOCK_INSERT_INTENTION,
block, next_rec_heap_no,
NULL, index, thr);
+#endif /* WITH_WSREP */
} else {
err = DB_SUCCESS;
}
@@ -5375,6 +5717,11 @@ lock_rec_convert_impl_to_expl(
implicit lock. Because cannot lock at this moment.*/
if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))
+#ifdef WITH_WSREP
+ && !wsrep_thd_is_BF(impl_trx->mysql_thd, FALSE)
+ /* BF-BF conflict is possible if advancing into
+ lock_rec_other_has_conflicting*/
+#endif /* WITH_WSREP */
&& lock_rec_other_has_conflicting(
LOCK_X | LOCK_REC_NOT_GAP, block,
heap_no, impl_trx)) {
diff --git a/storage/innobase/os/os0file.c b/storage/innobase/os/os0file.c
index 8fc22fbd119..7f5a09a572e 100644
--- a/storage/innobase/os/os0file.c
+++ b/storage/innobase/os/os0file.c
@@ -89,6 +89,12 @@ UNIV_INTERN os_mutex_t os_file_seek_mutexes[OS_FILE_N_SEEK_MUTEXES];
/* In simulated aio, merge at most this many consecutive i/os */
#define OS_AIO_MERGE_N_CONSECUTIVE 64
+#ifdef WITH_INNODB_DISALLOW_WRITES
+#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event)
+#else
+#define WAIT_ALLOW_WRITES() do { } while (0)
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/**********************************************************************
InnoDB AIO Implementation:
@@ -726,7 +732,9 @@ os_file_create_tmpfile(void)
/*========================*/
{
FILE* file = NULL;
- int fd = innobase_mysql_tmpfile();
+ int fd;
+ WAIT_ALLOW_WRITES();
+ fd = innobase_mysql_tmpfile();
if (fd >= 0) {
file = fdopen(fd, "w+b");
@@ -1045,6 +1053,7 @@ os_file_create_directory(
return (TRUE);
#else
int rcode;
+ WAIT_ALLOW_WRITES();
rcode = mkdir(pathname, 0770);
@@ -1146,6 +1155,8 @@ try_again:
os_file_t file;
int create_flag;
ibool retry;
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
try_again:
ut_a(name);
@@ -1278,6 +1289,8 @@ os_file_create_simple_no_error_handling_func(
int create_flag;
ut_a(name);
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
if (create_mode == OS_FILE_OPEN) {
if (access_type == OS_FILE_READ_ONLY) {
@@ -1561,6 +1574,8 @@ try_again:
int create_flag;
ibool retry;
const char* mode_str = NULL;
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
DBUG_EXECUTE_IF(
"ib_create_table_fail_disk_full",
@@ -1729,6 +1744,7 @@ loop:
goto loop;
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = unlink(name);
@@ -1792,6 +1808,7 @@ loop:
goto loop;
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = unlink(name);
@@ -1832,6 +1849,7 @@ os_file_rename_func(
return(FALSE);
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = rename(oldpath, newpath);
@@ -2117,6 +2135,7 @@ os_file_set_eof(
HANDLE h = (HANDLE) _get_osfhandle(fileno(file));
return(SetEndOfFile(h));
#else /* __WIN__ */
+ WAIT_ALLOW_WRITES();
return(!ftruncate(fileno(file), ftell(file)));
#endif /* __WIN__ */
}
@@ -2211,6 +2230,7 @@ os_file_flush_func(
return(FALSE);
#else
int ret;
+ WAIT_ALLOW_WRITES();
#if defined(HAVE_DARWIN_THREADS)
# ifndef F_FULLFSYNC
@@ -2903,6 +2923,7 @@ retry:
return(FALSE);
#else
ssize_t ret;
+ WAIT_ALLOW_WRITES();
ret = os_file_pwrite(file, buf, n, offset, offset_high);
diff --git a/storage/innobase/rem/rem0rec.c b/storage/innobase/rem/rem0rec.c
index d938aa696dd..cb4e8ca1cb1 100644
--- a/storage/innobase/rem/rem0rec.c
+++ b/storage/innobase/rem/rem0rec.c
@@ -31,6 +31,9 @@ Created 5/30/1994 Heikki Tuuri
#include "mtr0mtr.h"
#include "mtr0log.h"
+#ifdef WITH_WSREP
+#include <ha_prototypes.h>
+#endif /* WITH_WSREP */
/* PHYSICAL RECORD (OLD STYLE)
===========================
@@ -1897,3 +1900,133 @@ rec_print(
}
}
#endif /* !UNIV_HOTBACKUP */
+#ifdef WITH_WSREP
+int
+wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index in foreign table */
+ dict_index_t* index_ref, /* in: index in referenced table */
+ ibool new_protocol) /* in: protocol > 1 */
+{
+ const byte* data;
+ ulint len;
+ ulint key_len = 0;
+ ulint i;
+ uint key_parts;
+ mem_heap_t* heap = NULL;
+ ulint offsets_[REC_OFFS_NORMAL_SIZE];
+ const ulint* offsets;
+
+ ut_ad(index_for);
+ ut_ad(index_ref);
+
+ rec_offs_init(offsets_);
+ offsets = rec_get_offsets(rec, index_for, offsets_,
+ ULINT_UNDEFINED, &heap);
+
+ ut_ad(rec_offs_validate(rec, NULL, offsets));
+
+ ut_ad(rec);
+
+ key_parts = dict_index_get_n_unique_in_tree(index_for);
+ for (i = 0;
+ i < key_parts &&
+ (index_for->type & DICT_CLUSTERED || i < key_parts - 1);
+ i++) {
+ dict_field_t* field_f =
+ dict_index_get_nth_field(index_for, i);
+ const dict_col_t* col_f = dict_field_get_col(field_f);
+ dict_field_t* field_r =
+ dict_index_get_nth_field(index_ref, i);
+ const dict_col_t* col_r = dict_field_get_col(field_r);
+
+ data = rec_get_nth_field(rec, offsets, i, &len);
+ if (key_len + ((len != UNIV_SQL_NULL) ? len + 1 : 1) >
+ *buf_len) {
+ fprintf (stderr,
+ "WSREP: FK key len exceeded %lu %lu %lu\n",
+ key_len, len, *buf_len);
+ goto err_out;
+ }
+
+ if (len == UNIV_SQL_NULL) {
+ ut_a(!(col_f->prtype & DATA_NOT_NULL));
+ *buf++ = 1;
+ key_len++;
+ } else if (!new_protocol) {
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ memcpy(buf, data, len);
+ *buf_len = wsrep_innobase_mysql_sort(
+ (int)(col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)dtype_get_charset_coll(col_f->prtype),
+ buf, len, *buf_len);
+ } else { /* new protocol */
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ switch (col_f->mtype) {
+ case DATA_INT: {
+ byte* ptr = buf+len;
+ for (;;) {
+ ptr--;
+ *ptr = *data;
+ if (ptr == buf) {
+ break;
+ }
+ data++;
+ }
+
+ if (!(col_f->prtype & DATA_UNSIGNED)) {
+ buf[len-1] = (byte) (buf[len-1] ^ 128);
+ }
+
+ break;
+ }
+ case DATA_VARCHAR:
+ case DATA_VARMYSQL:
+ case DATA_CHAR:
+ case DATA_MYSQL:
+ /* Copy the actual data */
+ ut_memcpy(buf, data, len);
+ len = wsrep_innobase_mysql_sort(
+ (int)
+ (col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)
+ dtype_get_charset_coll(col_f->prtype),
+ buf, len, *buf_len);
+ break;
+ case DATA_BLOB:
+ case DATA_BINARY:
+ memcpy(buf, data, len);
+ break;
+ default:
+ break;
+ }
+
+ key_len += len;
+ buf += len;
+ }
+ }
+
+ rec_validate(rec, offsets);
+
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ *buf_len = key_len;
+ return DB_SUCCESS;
+
+ err_out:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+ return DB_ERROR;
+}
+#endif // WITH_WSREP
diff --git a/storage/innobase/row/row0ins.c b/storage/innobase/row/row0ins.c
index 5bdaf4b722b..c380390d62a 100644
--- a/storage/innobase/row/row0ins.c
+++ b/storage/innobase/row/row0ins.c
@@ -759,6 +759,14 @@ row_ins_invalidate_query_cache(
innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
mem_free(buf);
}
+#ifdef WITH_WSREP
+ulint wsrep_append_foreign_key(trx_t *trx,
+ dict_foreign_t* foreign,
+ const rec_t* clust_rec,
+ dict_index_t* clust_index,
+ ibool referenced,
+ ibool shared);
+#endif /* WITH_WSREP */
/*********************************************************************//**
Perform referential actions or checks when a parent row is deleted or updated
@@ -1072,6 +1080,18 @@ row_ins_foreign_check_on_constraint(
cascade->state = UPD_NODE_UPDATE_CLUSTERED;
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ clust_rec,
+ clust_index,
+ FALSE, FALSE);
+ if (err != DB_SUCCESS) {
+ fprintf(stderr,
+ "WSREP: foreign key append failed: %lu\n", err);
+ } else
+#endif
err = row_update_cascade_for_mysql(thr, cascade,
foreign->foreign_table);
@@ -1405,7 +1425,14 @@ run_again:
if (check_ref) {
err = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ rec,
+ check_index,
+ check_ref, TRUE);
+#endif /* WITH_WSREP */
goto end_scan;
} else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE
diff --git a/storage/innobase/row/row0upd.c b/storage/innobase/row/row0upd.c
index e7ac42a566f..05924958edd 100644
--- a/storage/innobase/row/row0upd.c
+++ b/storage/innobase/row/row0upd.c
@@ -51,6 +51,10 @@ Created 12/27/1996 Heikki Tuuri
#include "pars0sym.h"
#include "eval0eval.h"
#include "buf0lru.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h"
+extern my_bool wsrep_debug;
+#endif
/* What kind of latch and lock can we assume when the control comes to
@@ -170,6 +174,50 @@ func_exit:
return(is_referenced);
}
+#ifdef WITH_WSREP
+static
+ibool
+wsrep_row_upd_index_is_foreign(
+/*========================*/
+ dict_index_t* index, /*!< in: index */
+ trx_t* trx) /*!< in: transaction */
+{
+ dict_table_t* table = index->table;
+ dict_foreign_t* foreign;
+ ibool froze_data_dict = FALSE;
+ ibool is_referenced = FALSE;
+
+ if (!UT_LIST_GET_FIRST(table->foreign_list)) {
+
+ return(FALSE);
+ }
+
+ if (trx->dict_operation_lock_mode == 0) {
+ row_mysql_freeze_data_dictionary(trx);
+ froze_data_dict = TRUE;
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ if (foreign->foreign_index == index) {
+
+ is_referenced = TRUE;
+ goto func_exit;
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+func_exit:
+ if (froze_data_dict) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ return(is_referenced);
+}
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Checks if possible foreign key constraints hold after a delete of the record
under pcur.
@@ -284,7 +332,127 @@ row_upd_check_references_constraints(
}
err = DB_SUCCESS;
+func_exit:
+ if (got_s_lock) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ mem_heap_free(heap);
+
+ return(err);
+}
+#ifdef WITH_WSREP
+static
+ulint
+wsrep_row_upd_check_foreign_constraints(
+/*=================================*/
+ upd_node_t* node, /*!< in: row update node */
+ btr_pcur_t* pcur, /*!< in: cursor positioned on a record; NOTE: the
+ cursor position is lost in this function! */
+ dict_table_t* table, /*!< in: table in question */
+ dict_index_t* index, /*!< in: index of the cursor */
+ ulint* offsets,/*!< in/out: rec_get_offsets(pcur.rec, index) */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_foreign_t* foreign;
+ mem_heap_t* heap;
+ dtuple_t* entry;
+ trx_t* trx;
+ const rec_t* rec;
+ ulint n_ext;
+ ulint err;
+ ibool got_s_lock = FALSE;
+
+ if (UT_LIST_GET_FIRST(table->foreign_list) == NULL) {
+
+ return(DB_SUCCESS);
+ }
+
+ trx = thr_get_trx(thr);
+ if (wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+
+ return(DB_SUCCESS);
+ }
+
+ /* TODO: make native slave thread bail out here */
+
+ rec = btr_pcur_get_rec(pcur);
+ ut_ad(rec_offs_validate(rec, index, offsets));
+ heap = mem_heap_create(500);
+
+ entry = row_rec_to_index_entry(ROW_COPY_DATA, rec, index, offsets,
+ &n_ext, heap);
+
+ mtr_commit(mtr);
+
+ mtr_start(mtr);
+
+ if (trx->dict_operation_lock_mode == 0) {
+ got_s_lock = TRUE;
+
+ row_mysql_freeze_data_dictionary(trx);
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ /* Note that we may have an update which updates the index
+ record, but does NOT update the first fields which are
+ referenced in a foreign key constraint. Then the update does
+ NOT break the constraint. */
+
+ if (foreign->foreign_index == index
+ && (node->is_delete
+ || row_upd_changes_first_fields_binary(
+ entry, index, node->update,
+ foreign->n_fields))) {
+
+ if (foreign->referenced_table == NULL) {
+ dict_table_get(foreign->referenced_table_name_lookup,
+ FALSE, DICT_ERR_IGNORE_NONE);
+ }
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)++;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ /* NOTE that if the thread ends up waiting for a lock
+ we will release dict_operation_lock temporarily!
+ But the counter on the table protects 'foreign' from
+ being dropped while the check is running. */
+
+ err = row_ins_check_foreign_constraint(
+ TRUE, foreign, table, entry, thr);
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ ut_a(foreign->referenced_table
+ ->n_foreign_key_checks_running > 0);
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)--;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if (err != DB_SUCCESS) {
+
+ goto func_exit;
+ }
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+ err = DB_SUCCESS;
func_exit:
if (got_s_lock) {
row_mysql_unfreeze_data_dictionary(trx);
@@ -294,6 +462,7 @@ func_exit:
return(err);
}
+#endif /* WITH_WSREP */
/*********************************************************************//**
Creates an update node for a query graph.
@@ -1564,10 +1733,16 @@ row_upd_sec_index_entry(
trx_t* trx = thr_get_trx(thr);
ulint mode = BTR_MODIFY_LEAF;
enum row_search_result search_result;
+#ifdef WITH_WSREP
+ ibool foreign;
+#endif /* WITH_WSREP */
index = node->index;
referenced = row_upd_index_is_referenced(index, trx);
+#ifdef WITH_WSREP
+ foreign = wsrep_row_upd_index_is_foreign(index, trx);
+#endif /* WITH_WSREP */
heap = mem_heap_create(1024);
@@ -1630,6 +1805,9 @@ row_upd_sec_index_entry(
if (!rec_get_deleted_flag(
rec, dict_table_is_comp(index->table))) {
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
err = btr_cur_del_mark_set_sec_rec(
0, btr_cur, TRUE, thr, &mtr);
@@ -1647,6 +1825,38 @@ row_upd_sec_index_entry(
node, &pcur, index->table,
index, offsets, thr, &mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) ==
+ QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ ulint* offsets =
+ rec_get_offsets(
+ rec, index, NULL,
+ ULINT_UNDEFINED, &heap);
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, &pcur, index->table,
+ index, offsets, thr, &mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug)
+ fprintf (stderr,
+ "WSREP: sec index FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %lu",
+ err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
}
break;
}
@@ -1797,6 +2007,9 @@ row_upd_clust_rec_by_insert(
que_thr_t* thr, /*!< in: query thread */
ibool referenced,/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign, /*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in/out: mtr; gets committed here */
{
mem_heap_t* heap;
@@ -1810,6 +2023,9 @@ row_upd_clust_rec_by_insert(
rec_t* rec;
ulint* offsets = NULL;
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -1892,6 +2108,34 @@ err_exit:
goto err_exit;
}
}
+#ifdef WITH_WSREP
+ if (!referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: insert FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %lu",
+ err);
+ break;
+ }
+ if (err != DB_SUCCESS) {
+ goto err_exit;
+ }
+ }
+#endif /* WITH_WSREP */
}
mtr_commit(mtr);
@@ -2060,11 +2304,18 @@ row_upd_del_mark_clust_rec(
ibool referenced,
/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign,/*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr; gets committed here */
{
btr_pcur_t* pcur;
btr_cur_t* btr_cur;
ulint err;
+#ifdef WITH_WSREP
+ rec_t* rec;
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -2081,15 +2332,49 @@ row_upd_del_mark_clust_rec(
/* Mark the clustered index record deleted; we do not have to check
locks, because we assume that we have an x-lock on the record */
+#ifdef WITH_WSREP
+ rec = btr_cur_get_rec(btr_cur);
+#endif /* WITH_WSREP */
+
err = btr_cur_del_mark_set_clust_rec(
BTR_NO_LOCKING_FLAG, btr_cur_get_block(btr_cur),
+#ifdef WITH_WSREP
+ rec, index, offsets, TRUE, thr, mtr);
+#else
btr_cur_get_rec(btr_cur), index, offsets, TRUE, thr, mtr);
+#endif /* WITH_WSREP */
if (err == DB_SUCCESS && referenced) {
/* NOTE that the following call loses the position of pcur ! */
err = row_upd_check_references_constraints(
node, pcur, index->table, index, offsets, thr, mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ thr_get_trx(thr) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, index->table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: clust rec FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: clust rec referenced FK check fail: %lu",
+ err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
mtr_commit(mtr);
@@ -2118,11 +2403,17 @@ row_upd_clust_step(
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets;
ibool referenced;
+#ifdef WITH_WSREP
+ ibool foreign;
+#endif /* WITH_WSREP */
rec_offs_init(offsets_);
index = dict_table_get_first_index(node->table);
referenced = row_upd_index_is_referenced(index, thr_get_trx(thr));
+#ifdef WITH_WSREP
+ foreign = wsrep_row_upd_index_is_foreign(index, thr_get_trx(thr));
+#endif /* WITH_WSREP */
pcur = node->pcur;
@@ -2192,7 +2483,11 @@ row_upd_clust_step(
if (node->is_delete) {
err = row_upd_del_mark_clust_rec(
+#ifdef WITH_WSREP
+ node, index, offsets, thr, referenced, foreign, mtr);
+#else
node, index, offsets, thr, referenced, mtr);
+#endif /* WITH_WSREP */
if (err == DB_SUCCESS) {
node->state = UPD_NODE_UPDATE_ALL_SEC;
@@ -2243,7 +2538,11 @@ exit_func:
externally! */
err = row_upd_clust_rec_by_insert(
+#ifdef WITH_WSREP
+ node, index, thr, referenced, foreign, mtr);
+#else
node, index, thr, referenced, mtr);
+#endif /* WITH_WSREP */
if (err != DB_SUCCESS) {
diff --git a/storage/innobase/srv/srv0srv.c b/storage/innobase/srv/srv0srv.c
index 90f72f5adf3..ca3e960b842 100644
--- a/storage/innobase/srv/srv0srv.c
+++ b/storage/innobase/srv/srv0srv.c
@@ -90,6 +90,10 @@ Created 10/8/1995 Heikki Tuuri
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
+#ifdef WITH_WSREP
+extern int wsrep_debug;
+extern int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
/* The following counter is incremented whenever there is some user activity
in the server */
UNIV_INTERN ulint srv_activity_count = 0;
@@ -202,6 +206,10 @@ srv_printf_innodb_monitor() will request mutex acquisition
with mutex_enter(), which will wait until it gets the mutex. */
#define MUTEX_NOWAIT(mutex_skipped) ((mutex_skipped) < MAX_MUTEX_NOWAIT)
+#ifdef WITH_INNODB_DISALLOW_WRITES
+UNIV_INTERN os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/** The sort order table of the MySQL latin1_swedish_ci character set
collation */
UNIV_INTERN const byte* srv_latin1_ordering;
@@ -375,6 +383,9 @@ struct srv_conc_slot_struct{
free to proceed; but
reserved may still be
TRUE at that point */
+#ifdef WITH_WSREP
+ void *thd; /*!< to see priority */
+#endif
UT_LIST_NODE_T(srv_conc_slot_t) srv_conc_queue; /*!< queue node */
};
@@ -1094,8 +1105,20 @@ srv_init(void)
conc_slot->reserved = FALSE;
conc_slot->event = os_event_create(NULL);
ut_a(conc_slot->event);
+#ifdef WITH_WSREP
+ conc_slot->thd = NULL;
+#endif /* WITH_WSREP */
}
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ /* Writes have to be enabled on init or else we hang. Thus, we
+ always set the event here regardless of innobase_disallow_writes.
+ That flag will always be 0 at this point because it isn't settable
+ via my.cnf or command line arg. */
+ srv_allow_writes_event = os_event_create(NULL);
+ os_event_set(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/* Initialize some INFORMATION SCHEMA internal structures */
trx_i_s_cache_init(trx_i_s_cache);
}
@@ -1144,6 +1167,23 @@ srv_general_init(void)
/* Maximum allowable purge history length. <=0 means 'infinite'. */
UNIV_INTERN ulong srv_max_purge_lag = 0;
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx) /*!< in: transaction object associated with the
+ thread */
+{
+ os_fast_mutex_lock(&srv_conc_mutex);
+ if (trx->wsrep_event) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel\n");
+ os_event_set(trx->wsrep_event);
+ }
+ os_fast_mutex_unlock(&srv_conc_mutex);
+}
+#endif /* WITH_WSREP */
/*********************************************************************//**
Puts an OS thread to wait if there are too many concurrent threads
(>= srv_thread_concurrency) inside InnoDB. The threads wait in a FIFO queue. */
@@ -1181,6 +1221,18 @@ srv_conc_enter_innodb(
return;
}
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+#endif
os_fast_mutex_lock(&srv_conc_mutex);
retry:
if (trx->declared_to_be_inside_innodb) {
@@ -1273,6 +1325,9 @@ retry:
/* Add to the queue */
slot->reserved = TRUE;
slot->wait_ended = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = trx->mysql_thd;
+#endif
UT_LIST_ADD_LAST(srv_conc_queue, srv_conc_queue, slot);
@@ -1280,6 +1335,19 @@ retry:
srv_conc_n_waiting_threads++;
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ srv_conc_n_waiting_threads--;
+ os_fast_mutex_unlock(&srv_conc_mutex);
+ if (wsrep_debug)
+ fprintf(stderr, "srv_conc_enter due to MUST_ABORT");
+ trx->declared_to_be_inside_innodb = TRUE;
+ trx->n_tickets_to_enter_innodb = SRV_FREE_TICKETS_TO_ENTER;
+ return;
+ }
+ trx->wsrep_event = slot->event;
+#endif /* WITH_WSREP */
os_fast_mutex_unlock(&srv_conc_mutex);
/* Go to wait for the event; when a thread leaves InnoDB it will
@@ -1294,6 +1362,9 @@ retry:
thd_wait_begin(trx->mysql_thd, THD_WAIT_USER_LOCK);
os_event_wait(slot->event);
thd_wait_end(trx->mysql_thd);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
trx->op_info = "";
@@ -1305,6 +1376,9 @@ retry:
incremented the thread counter on behalf of this thread */
slot->reserved = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = NULL;
+#endif
UT_LIST_REMOVE(srv_conc_queue, srv_conc_queue, slot);
@@ -1375,6 +1449,9 @@ srv_conc_force_exit_innodb(
trx->n_tickets_to_enter_innodb = 0;
if (srv_conc_n_threads < (lint)srv_thread_concurrency) {
+#ifdef WITH_WSREP
+ srv_conc_slot_t* wsrep_slot;
+#endif
/* Look for a slot where a thread is waiting and no other
thread has yet released the thread */
@@ -1384,6 +1461,19 @@ srv_conc_force_exit_innodb(
slot = UT_LIST_GET_NEXT(srv_conc_queue, slot);
}
+#ifdef WITH_WSREP
+ /* look for aborting trx, they must be released asap */
+ wsrep_slot= slot;
+ while (wsrep_slot && (wsrep_slot->wait_ended == TRUE ||
+ !wsrep_trx_is_aborting(wsrep_slot->thd))) {
+ wsrep_slot = UT_LIST_GET_NEXT(srv_conc_queue, wsrep_slot);
+ }
+ if (wsrep_slot) {
+ slot = wsrep_slot;
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: releasing aborting thd\n");
+ }
+#endif
if (slot != NULL) {
slot->wait_ended = TRUE;
@@ -1761,7 +1851,20 @@ srv_suspend_mysql_thread(
if (lock_wait_timeout < 100000000
&& wait_time > (double) lock_wait_timeout) {
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_BF(trx->mysql_thd, TRUE)) {
+ fprintf(stderr,
+ "WSREP: BF long lock wait ended after %.f sec\n",
+ wait_time);
+ srv_print_innodb_monitor = FALSE;
+ srv_print_innodb_lock_monitor = FALSE;
+ } else {
+#endif
trx->error_state = DB_LOCK_WAIT_TIMEOUT;
+#ifdef WITH_WSREP
+ }
+#endif
}
if (trx_is_interrupted(trx)) {
@@ -2316,6 +2419,27 @@ exit_func:
OS_THREAD_DUMMY_RETURN;
}
+#ifdef WITH_WSREP
+/*********************************************************************//**
+check if lock timeout was for priority thread,
+as a side effect trigger lock monitor
+@return false for regular lock timeout */
+static ibool
+wsrep_is_BF_lock_timeout(
+/*====================*/
+ srv_slot_t* slot) /* in: lock slot to check for lock priority */
+{
+ if (wsrep_on(thr_get_trx(slot->thr)->mysql_thd) &&
+ wsrep_thd_is_BF((thr_get_trx(slot->thr))->mysql_thd, TRUE)) {
+ fprintf(stderr, "WSREP: BF lock wait long\n");
+ srv_print_innodb_monitor = TRUE;
+ srv_print_innodb_lock_monitor = TRUE;
+ os_event_set(srv_lock_timeout_thread_event);
+ return TRUE;
+ }
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
/*********************************************************************//**
A thread which wakes up threads whose lock wait may have lasted too long.
@return a dummy parameter */
@@ -2384,8 +2508,14 @@ loop:
granted: in that case do nothing */
if (trx->wait_lock) {
+#ifdef WITH_WSREP
+ if (!wsrep_is_BF_lock_timeout(slot)) {
+#endif
lock_cancel_waiting_and_release(
trx->wait_lock);
+#ifdef WITH_WSREP
+ }
+#endif
}
}
}
@@ -2504,7 +2634,20 @@ loop:
if (sync_array_print_long_waits(&waiter, &sema)
&& sema == old_sema && os_thread_eq(waiter, old_waiter)) {
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ if (srv_allow_writes_event->is_set) {
+#endif /* WITH_WSREP */
fatal_cnt++;
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ } else {
+ fprintf(stderr,
+ "WSREP: avoiding InnoDB self crash due to long "
+ "semaphore wait of > %lu seconds\n"
+ "Server is processing SST donor operation, "
+ "fatal_cnt now: %lu",
+ (ulong) srv_fatal_semaphore_wait_threshold, fatal_cnt);
+ }
+#endif /* WITH_WSREP */
if (fatal_cnt > 10) {
fprintf(stderr,
diff --git a/storage/innobase/trx/trx0roll.c b/storage/innobase/trx/trx0roll.c
index ffd7bb3d146..d0bbf7fd7c2 100644
--- a/storage/innobase/trx/trx0roll.c
+++ b/storage/innobase/trx/trx0roll.c
@@ -42,6 +42,9 @@ Created 3/26/1996 Heikki Tuuri
#include "row0mysql.h"
#include "lock0lock.h"
#include "pars0pars.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h"
+#endif /* WITH_WSREP */
/** This many pages must be undone before a truncate is tried within
rollback */
@@ -147,6 +150,12 @@ trx_rollback_for_mysql(
trx->op_info = "";
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
return(err);
}
@@ -174,6 +183,12 @@ trx_rollback_last_sql_stat_for_mysql(
trx->op_info = "";
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
return(err);
}
@@ -1123,6 +1138,12 @@ trx_rollback(
srv_que_task_enqueue_low(thr);
/* srv_que_task_enqueue_low(thr2); */
}
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
}
/****************************************************************//**
@@ -1281,6 +1302,12 @@ trx_finish_rollback_off_kernel(
sig = next_sig;
}
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
}
/*********************************************************************//**
diff --git a/storage/innobase/trx/trx0sys.c b/storage/innobase/trx/trx0sys.c
index 256e22d1b50..a0fcc2b2370 100644
--- a/storage/innobase/trx/trx0sys.c
+++ b/storage/innobase/trx/trx0sys.c
@@ -44,6 +44,10 @@ Created 3/26/1996 Heikki Tuuri
#include "os0file.h"
#include "read0read.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h" /* wsrep_is_wsrep_xid() */
+#endif /* */
+
/** The file format tag structure with id and name. */
struct file_format_struct {
ulint id; /*!< id of the file format */
@@ -691,10 +695,14 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr */
{
+#ifndef WITH_WSREP
trx_sysf_t* sys_header;
-
+#endif /* !WITH_WSREP */
if (ut_strlen(file_name) >= TRX_SYS_MYSQL_LOG_NAME_LEN) {
/* We cannot fit the name to the 512 bytes we have reserved */
@@ -702,7 +710,9 @@ trx_sys_update_mysql_binlog_offset(
return;
}
+#ifndef WITH_WSREP
sys_header = trx_sysf_get(mtr);
+#endif /* !WITH_WSREP */
if (mach_read_from_4(sys_header + field
+ TRX_SYS_MYSQL_LOG_MAGIC_N_FLD)
@@ -789,6 +799,125 @@ trx_sys_print_mysql_binlog_offset(void)
mtr_commit(&mtr);
}
+#ifdef WITH_WSREP
+
+#ifdef UNIV_DEBUG
+static long long trx_sys_cur_xid_seqno = -1;
+static unsigned char trx_sys_cur_xid_uuid[16];
+
+long long read_wsrep_xid_seqno(const XID* xid)
+{
+ long long seqno;
+ memcpy(&seqno, xid->data + 24, sizeof(long long));
+ return seqno;
+}
+
+void read_wsrep_xid_uuid(const XID* xid, unsigned char* buf)
+{
+ memcpy(buf, xid->data + 8, 16);
+}
+
+#endif /* UNIV_DEBUG */
+
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: transaction XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr) /*!< in: mtr */
+{
+
+#ifdef UNIV_DEBUG
+ {
+ /* Check that seqno is monotonically increasing */
+ unsigned char xid_uuid[16];
+ long long xid_seqno = read_wsrep_xid_seqno(xid);
+ read_wsrep_xid_uuid(xid, xid_uuid);
+ if (!memcmp(xid_uuid, trx_sys_cur_xid_uuid, 8))
+ {
+ ut_ad(xid_seqno > trx_sys_cur_xid_seqno);
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+ else
+ {
+ memcpy(trx_sys_cur_xid_uuid, xid_uuid, 16);
+ }
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+#endif /* UNIV_DEBUG */
+
+ ut_ad(xid && mtr && sys_header);
+ ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid));
+
+ if (mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD)
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD,
+ TRX_SYS_WSREP_XID_MAGIC_N,
+ MLOG_4BYTES, mtr);
+ }
+
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_FORMAT,
+ (int)xid->formatID,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_GTRID_LEN,
+ (int)xid->gtrid_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_BQUAL_LEN,
+ (int)xid->bqual_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_string(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_DATA,
+ (const unsigned char*) xid->data,
+ XIDDATASIZE, mtr);
+
+}
+
+void
+trx_sys_read_wsrep_checkpoint(XID* xid)
+/*===================================*/
+{
+ trx_sysf_t* sys_header;
+ mtr_t mtr;
+ ulint magic;
+
+ ut_ad(xid);
+
+ mtr_start(&mtr);
+
+ sys_header = trx_sysf_get(&mtr);
+
+ if ((magic = mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD))
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ memset(xid, 0, sizeof(*xid));
+ xid->formatID = -1;
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ return;
+ }
+
+ xid->formatID = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_FORMAT);
+ xid->gtrid_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_GTRID_LEN);
+ xid->bqual_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_BQUAL_LEN);
+ ut_memcpy(xid->data,
+ sys_header + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_DATA,
+ XIDDATASIZE);
+
+ mtr_commit(&mtr);
+}
+
+#endif /* WITH_WSREP */
+
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header if
the magic number shows it valid. */
diff --git a/storage/innobase/trx/trx0trx.c b/storage/innobase/trx/trx0trx.c
index 2d585b7507f..ce43c80e826 100644
--- a/storage/innobase/trx/trx0trx.c
+++ b/storage/innobase/trx/trx0trx.c
@@ -193,6 +193,9 @@ trx_create(
/* Remember to free the vector explicitly. */
trx->autoinc_locks = ib_vector_create(
mem_heap_create(sizeof(ib_vector_t) + sizeof(void*) * 4), 4);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
return(trx);
}
@@ -714,6 +717,11 @@ trx_start_low(
trx->id = trx_sys_get_new_trx_id();
+#ifdef WITH_WSREP
+ memset(&trx->xid, 0, sizeof(trx->xid));
+ trx->xid.formatID = -1;
+#endif /* WITH_WSREP */
+
/* The initial value for trx->no: IB_ULONGLONG_MAX is used in
read_view_open_now: */
@@ -818,6 +826,9 @@ trx_write_serialisation_history(
/*============================*/
trx_t* trx) /*!< in: transaction */
{
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header;
+#endif /* WITH_WSREP */
mtr_t mtr;
trx_rseg_t* rseg;
@@ -867,6 +878,15 @@ trx_write_serialisation_history(
mutex_exit(&rseg->mutex);
+#ifdef WITH_WSREP
+ sys_header = trx_sysf_get(&mtr);
+ /* Update latest MySQL wsrep XID in trx sys header. */
+ if (wsrep_is_wsrep_xid(&trx->xid))
+ {
+ trx_sys_update_wsrep_checkpoint(&trx->xid, sys_header, &mtr);
+ }
+#endif /* WITH_WSREP */
+
/* Update the latest MySQL binlog name and offset info
in trx sys header if MySQL binlogging is on or the database
server is a MySQL replication slave */
@@ -877,7 +897,11 @@ trx_write_serialisation_history(
trx_sys_update_mysql_binlog_offset(
trx->mysql_log_file_name,
trx->mysql_log_offset,
- TRX_SYS_MYSQL_LOG_INFO, &mtr);
+ TRX_SYS_MYSQL_LOG_INFO,
+#ifdef WITH_WSREP
+ sys_header,
+#endif /* WITH_WSREP */
+ &mtr);
trx->mysql_log_file_name = NULL;
}
@@ -1064,6 +1088,12 @@ trx_commit_off_kernel(
ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
ut_ad(UT_LIST_GET_LEN(trx->trx_locks) == 0);
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
UT_LIST_REMOVE(trx_list, trx_sys->trx_list, trx);
trx->error_state = DB_SUCCESS;
diff --git a/storage/innobase/ut/ut0ut.c b/storage/innobase/ut/ut0ut.c
index 699af1fcaa1..8b9501ee8bc 100644
--- a/storage/innobase/ut/ut0ut.c
+++ b/storage/innobase/ut/ut0ut.c
@@ -554,7 +554,7 @@ ut_print_namel(
trx ? trx->mysql_thd : NULL,
table_id);
- fwrite(buf, 1, bufend - buf, f);
+ (void) fwrite(buf, 1, bufend - buf, f);
}
/**********************************************************************//**
@@ -575,7 +575,7 @@ ut_copy_file(
? (size_t) len
: sizeof buf;
size_t size = fread(buf, 1, maxs, src);
- fwrite(buf, 1, size, dest);
+ (void) fwrite(buf, 1, size, dest);
len -= (long) size;
if (size < maxs) {
break;
diff --git a/storage/tokudb/CMakeLists.txt b/storage/tokudb/CMakeLists.txt
index fa9178a65a5..453a096ace7 100644
--- a/storage/tokudb/CMakeLists.txt
+++ b/storage/tokudb/CMakeLists.txt
@@ -19,7 +19,7 @@ ENDIF()
############################################
SET(TOKUDB_VERSION "7.5.0")
-SET(TOKUDB_DEB_FILES "usr/lib/mysql/plugin/ha_tokudb.so\netc/mysql/conf.d/tokudb.cnf\nusr/bin/tokuftdump\nusr/share/doc/mariadb-server-5.5/README-TOKUDB\nusr/share/doc/mariadb-server-5.5/README.md" PARENT_SCOPE)
+SET(TOKUDB_DEB_FILES "usr/lib/mysql/plugin/ha_tokudb.so\netc/mysql/conf.d/tokudb.cnf\nusr/bin/tokuftdump\nusr/share/doc/mariadb-galera-server-5.5/README-TOKUDB\nusr/share/doc/mariadb-galera-server-5.5/README.md" PARENT_SCOPE)
SET(USE_BDB OFF CACHE BOOL "")
MARK_AS_ADVANCED(BUILDNAME)
MARK_AS_ADVANCED(BUILD_TESTING)
diff --git a/storage/xtradb/buf/buf0buf.c b/storage/xtradb/buf/buf0buf.c
index f8966e9b15c..44e93ae2122 100644
--- a/storage/xtradb/buf/buf0buf.c
+++ b/storage/xtradb/buf/buf0buf.c
@@ -2536,7 +2536,7 @@ loop:
#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
loop2:
-#endif
+#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
if (block && buf_pool_watch_is_sentinel(buf_pool, &block->page)) {
mutex_exit(block_mutex);
block = NULL;
diff --git a/storage/xtradb/dict/dict0dict.c b/storage/xtradb/dict/dict0dict.c
index cf246bcdd5f..b7ffbabc22e 100644
--- a/storage/xtradb/dict/dict0dict.c
+++ b/storage/xtradb/dict/dict0dict.c
@@ -2801,7 +2801,26 @@ next_rec:
return(NULL);
}
-
+#ifdef WITH_WSREP
+dict_index_t*
+wsrep_dict_foreign_find_index(
+/*====================*/
+ dict_table_t* table, /*!< in: table */
+ const char** columns,/*!< in: array of column names */
+ ulint n_cols, /*!< in: number of columns */
+ dict_index_t* types_idx, /*!< in: NULL or an index to whose types the
+ column types must match */
+ ibool check_charsets,
+ /*!< in: whether to check charsets.
+ only has an effect if types_idx != NULL */
+ ulint check_null)
+ /*!< in: nonzero if none of the columns must
+ be declared NOT NULL */
+{
+ return dict_foreign_find_index(
+ table, columns, n_cols, types_idx, check_charsets, check_null);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Find an index that is equivalent to the one passed in and is not marked
for deletion.
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index c4f1e6d2b07..ad4b3045b48 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -103,6 +103,12 @@ extern "C" {
enum_tx_isolation thd_get_trx_isolation(const THD* thd);
+#ifdef WITH_WSREP
+#include "../storage/innobase/include/ut0byte.h"
+#ifndef EXTRA_DEBUG
+ //#include "../storage/innobase/include/ut0byte.ic"
+#endif /* EXTRA_DEBUG */
+#endif /* WITH_WSREP */
}
#include "ha_innodb.h"
@@ -120,6 +126,35 @@ extern ib_int64_t trx_sys_mysql_relay_log_pos;
# define MYSQL_PLUGIN_IMPORT /* nothing */
# endif /* MYSQL_PLUGIN_IMPORT */
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+#include <my_md5.h>
+extern my_bool wsrep_certify_nonPK;
+class binlog_trx_data;
+extern handlerton *binlog_hton;
+
+extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
+extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd;
+
+static inline wsrep_ws_handle_t*
+wsrep_ws_handle(THD* thd, const trx_t* trx) {
+ return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
+ (wsrep_trx_id_t)trx->id);
+}
+
+extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_buf_t* key,
+ size_t* key_len);
+
+extern handlerton * wsrep_hton;
+extern handlerton * binlog_hton;
+extern void wsrep_cleanup_transaction(THD *thd);
+#endif /* WITH_WSREP */
/** to protect innobase_open_files */
static mysql_mutex_t innobase_share_mutex;
static ulong commit_threads = 0;
@@ -1100,6 +1135,15 @@ thd_to_trx(
{
return(*(trx_t**) thd_ha_data(thd, innodb_hton_ptr));
}
+#ifdef WITH_WSREP
+ulonglong
+thd_to_trx_id(
+/*=======*/
+ THD* thd) /*!< in: MySQL thread */
+{
+ return(thd_to_trx(thd)->id);
+}
+#endif
my_bool
ha_innobase::is_fake_change_enabled(THD* thd)
@@ -1140,6 +1184,15 @@ innobase_release_temporary_latches(
return(0);
}
+#ifdef WITH_WSREP
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal);
+static void
+wsrep_fake_trx_id(handlerton* hton, THD *thd);
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid);
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid);
+#endif
/********************************************************************//**
Increments innobase_active_counter and every INNOBASE_WAKE_INTERVALth
time calls srv_active_wake_master_thread. This function should be used
@@ -1556,6 +1609,9 @@ int
innobase_mysql_tmpfile(void)
/*========================*/
{
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ os_event_wait(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
int fd2 = -1;
File fd;
@@ -2672,6 +2728,12 @@ innobase_init(
innobase_hton->flags=HTON_EXTENDED_KEYS;
innobase_hton->release_temporary_latches=innobase_release_temporary_latches;
innobase_hton->alter_table_flags = innobase_alter_table_flags;
+#ifdef WITH_WSREP
+ innobase_hton->wsrep_abort_transaction=wsrep_abort_transaction;
+ innobase_hton->wsrep_set_checkpoint=innobase_wsrep_set_checkpoint;
+ innobase_hton->wsrep_get_checkpoint=innobase_wsrep_get_checkpoint;
+ innobase_hton->wsrep_fake_trx_id=wsrep_fake_trx_id;
+#endif /* WITH_WSREP */
innobase_hton->kill_query = innobase_kill_query;
ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR);
@@ -3419,6 +3481,23 @@ innobase_commit_low(
/*================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ THD* thd = (THD*)trx->mysql_thd;
+ const char* tmp = 0;
+ if (wsrep_on((void*)thd)) {
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1,
+ "innobase_commit_low():trx_commit_for_mysql(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ tmp = thd_proc_info(thd, info);
+
+#else
+ tmp = thd_proc_info(thd, "innobase_commit_low()");
+#endif /* WSREP_PROC_INFO */
+ }
+#endif /* WITH_WSREP */
if (trx_is_started(trx)) {
/* Save the current replication position for write to trx sys
@@ -3429,6 +3508,9 @@ innobase_commit_low(
trx_commit_for_mysql(trx);
}
+#ifdef WITH_WSREP
+ if (wsrep_on((void*)thd)) { thd_proc_info(thd, tmp); }
+#endif /* WITH_WSREP */
}
/*****************************************************************//**
@@ -4098,7 +4180,14 @@ ha_innobase::max_supported_key_length() const
therefore set to slightly less than 1 / 4 of page size which
is 16 kB; but currently MySQL does not work with keys whose
size is > MAX_KEY_LENGTH */
+#ifdef WITH_WSREP
+ /* this may look like obsolete code, but this ifdef is here
+ just to make sure we will see bzr merge conflict, if Oracle
+ changes max key length */
+ return(3500);
+#else
return(3500);
+#endif
}
/****************************************************************//**
@@ -5199,7 +5288,119 @@ innobase_mysql_cmp(
return(0);
}
+#ifdef WITH_WSREP
+extern "C" UNIV_INTERN
+int
+wsrep_innobase_mysql_sort(
+/*===============*/
+ /* out: str contains sort string */
+ int mysql_type, /* in: MySQL type */
+ uint charset_number, /* in: number of the charset */
+ unsigned char* str, /* in: data field */
+ unsigned int str_length, /* in: data field length,
+ not UNIV_SQL_NULL */
+ unsigned int buf_length) /* in: total str buffer length */
+
+{
+ CHARSET_INFO* charset;
+ enum_field_types mysql_tp;
+ int ret_length = str_length;
+
+ DBUG_ASSERT(str_length != UNIV_SQL_NULL);
+
+ mysql_tp = (enum_field_types) mysql_type;
+
+ switch (mysql_tp) {
+
+ case MYSQL_TYPE_BIT:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_VARCHAR:
+ {
+ uchar tmp_str[REC_VERSION_56_MAX_INDEX_COL_LEN];
+ uint tmp_length = REC_VERSION_56_MAX_INDEX_COL_LEN;
+
+ /* Use the charset number to pick the right charset struct for
+ the comparison. Since the MySQL function get_charset may be
+ slow before Bar removes the mutex operation there, we first
+ look at 2 common charsets directly. */
+
+ if (charset_number == default_charset_info->number) {
+ charset = default_charset_info;
+ } else if (charset_number == my_charset_latin1.number) {
+ charset = &my_charset_latin1;
+ } else {
+ charset = get_charset(charset_number, MYF(MY_WME));
+
+ if (charset == NULL) {
+ sql_print_error("InnoDB needs charset %lu for doing "
+ "a comparison, but MySQL cannot "
+ "find that charset.",
+ (ulong) charset_number);
+ ut_a(0);
+ }
+ }
+
+ ut_a(str_length <= tmp_length);
+ memcpy(tmp_str, str, str_length);
+
+ if (wsrep_protocol_version < 3) {
+ tmp_length = charset->coll->strnxfrm(
+ charset, str, str_length,
+ tmp_str, str_length);
+ DBUG_ASSERT(tmp_length <= str_length);
+ } else {
+ /* strnxfrm will expand the destination string,
+ protocols < 3 truncated the sorted sring
+ protocols > 3 gets full sorted sring
+ */
+ /* 5.5 strnxfrm pads the tail with spaces and
+ always returns the full destination buffer lenght
+ we cannot know how many characters were converted
+ using 2 * str length here as best guess
+ */
+ uint dst_length = (str_length * 2 < tmp_length) ?
+ (str_length * 2) : tmp_length;
+ tmp_length = charset->coll->strnxfrm(
+ charset, str, dst_length,
+ tmp_str, str_length);
+ DBUG_ASSERT(tmp_length <= buf_length);
+ ret_length = tmp_length;
+ }
+
+ break;
+ }
+ case MYSQL_TYPE_DECIMAL :
+ case MYSQL_TYPE_TINY :
+ case MYSQL_TYPE_SHORT :
+ case MYSQL_TYPE_LONG :
+ case MYSQL_TYPE_FLOAT :
+ case MYSQL_TYPE_DOUBLE :
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_TIMESTAMP :
+ case MYSQL_TYPE_LONGLONG :
+ case MYSQL_TYPE_INT24 :
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_DATETIME :
+ case MYSQL_TYPE_YEAR :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_NEWDECIMAL :
+ case MYSQL_TYPE_ENUM :
+ case MYSQL_TYPE_SET :
+ case MYSQL_TYPE_GEOMETRY :
+ break;
+ default:
+ break;
+ }
+ return ret_length;
+}
+#endif // WITH_WSREP
/**************************************************************//**
Converts a MySQL type to an InnoDB type. Note that this function returns
the 'mtype' of InnoDB. InnoDB differentiates between MySQL's old <= 4.1
@@ -5353,6 +5554,268 @@ innobase_read_from_2_little_endian(
/*******************************************************************//**
Stores a key value for a row to a buffer.
@return key value length as stored in buff */
+#ifdef WITH_WSREP
+UNIV_INTERN
+uint
+wsrep_store_key_val_for_row(
+/*===============================*/
+ TABLE* table,
+ uint keynr, /*!< in: key number */
+ char* buff, /*!< in/out: buffer for the key value (in MySQL
+ format) */
+ uint buff_len,/*!< in: buffer length */
+ const uchar* record,
+ ibool* key_is_null)/*!< out: full key was null */
+{
+ KEY* key_info = table->key_info + keynr;
+ KEY_PART_INFO* key_part = key_info->key_part;
+ KEY_PART_INFO* end = key_part + key_info->key_parts;
+ char* buff_start = buff;
+ enum_field_types mysql_type;
+ Field* field;
+
+ DBUG_ENTER("store_key_val_for_row");
+
+ bzero(buff, buff_len);
+ *key_is_null = TRUE;
+
+ for (; key_part != end; key_part++) {
+ uchar sorted[REC_VERSION_56_MAX_INDEX_COL_LEN] = {'\0'};
+ ibool part_is_null = FALSE;
+
+ if (key_part->null_bit) {
+ if (record[key_part->null_offset] &
+ key_part->null_bit) {
+ *buff = 1;
+ part_is_null = TRUE;
+ } else {
+ *buff = 0;
+ }
+ buff++;
+ }
+ if (!part_is_null) *key_is_null = FALSE;
+
+ field = key_part->field;
+ mysql_type = field->type();
+
+ if (mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* >= 5.0.3 true VARCHAR */
+ ulint lenlen;
+ ulint len;
+ const byte* data;
+ ulint key_len;
+ ulint true_len;
+ CHARSET_INFO* cs;
+ int error=0;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+ cs = field->charset();
+
+ lenlen = (ulint)
+ (((Field_varstring*)field)->length_bytes);
+
+ data = row_mysql_read_true_varchar(&len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ lenlen);
+
+ true_len = len;
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) data,
+ (const char *) data + len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* In a column prefix index, we may need to truncate
+ the stored value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, data, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ if (wsrep_protocol_version > 1) {
+ memcpy(buff, sorted, true_len);
+ /* Note that we always reserve the maximum possible
+ length of the true VARCHAR in the key value, though
+ only len first bytes after the 2 length bytes contain
+ actual data. The rest of the space was reset to zero
+ in the bzero() call above. */
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else if (mysql_type == MYSQL_TYPE_TINY_BLOB
+ || mysql_type == MYSQL_TYPE_MEDIUM_BLOB
+ || mysql_type == MYSQL_TYPE_BLOB
+ || mysql_type == MYSQL_TYPE_LONG_BLOB
+ /* MYSQL_TYPE_GEOMETRY data is treated
+ as BLOB data in innodb. */
+ || mysql_type == MYSQL_TYPE_GEOMETRY) {
+
+ CHARSET_INFO* cs;
+ ulint key_len;
+ ulint true_len;
+ int error=0;
+ ulint blob_len;
+ const byte* blob_data;
+
+ ut_a(key_part->key_part_flag & HA_PART_KEY_SEG);
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len + 2;
+
+ continue;
+ }
+
+ cs = field->charset();
+
+ blob_data = row_mysql_read_blob_ref(&blob_len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ (ulint) field->pack_length());
+
+ true_len = blob_len;
+
+ ut_a(get_field_offset(table, field)
+ == key_part->offset);
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (blob_len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) blob_data,
+ (const char *) blob_data
+ + blob_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* All indexes on BLOB and TEXT are column prefix
+ indexes, and we may need to truncate the data to be
+ stored in the key value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, blob_data, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ memcpy(buff, sorted, true_len);
+
+ /* Note that we always reserve the maximum possible
+ length of the BLOB prefix in the key value. */
+ if (wsrep_protocol_version > 1) {
+ buff += true_len;
+ } else {
+ buff += key_len;
+ }
+ } else {
+ /* Here we handle all other data types except the
+ true VARCHAR, BLOB and TEXT. Note that the column
+ value we store may be also in a column prefix
+ index. */
+
+ CHARSET_INFO* cs;
+ ulint true_len;
+ ulint key_len;
+ const uchar* src_start;
+ int error=0;
+ enum_field_types real_type;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ buff += key_len;
+
+ continue;
+ }
+
+ src_start = record + key_part->offset;
+ real_type = field->real_type();
+ true_len = key_len;
+
+ /* Character set for the field is defined only
+ to fields whose type is string and real field
+ type is not enum or set. For these fields check
+ if character set is multi byte. */
+
+ if (real_type != MYSQL_TYPE_ENUM
+ && real_type != MYSQL_TYPE_SET
+ && ( mysql_type == MYSQL_TYPE_VAR_STRING
+ || mysql_type == MYSQL_TYPE_STRING)) {
+
+ cs = field->charset();
+
+ /* For multi byte character sets we need to
+ calculate the true length of the key */
+
+ if (key_len > 0 && cs->mbmaxlen > 1) {
+
+ true_len = (ulint)
+ cs->cset->well_formed_len(cs,
+ (const char *)src_start,
+ (const char *)src_start
+ + key_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+ memcpy(sorted, src_start, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ memcpy(buff, sorted, true_len);
+ } else {
+ memcpy(buff, src_start, true_len);
+ }
+ buff += true_len;
+
+ /* Pad the unused space with spaces. */
+
+#ifdef REMOVED
+ if (true_len < key_len) {
+ ulint pad_len = key_len - true_len;
+ ut_a(!(pad_len % cs->mbminlen));
+
+ cs->cset->fill(cs, buff, pad_len,
+ 0x20 /* space */);
+ buff += pad_len;
+ }
+#endif /* REMOVED */
+ }
+ }
+
+ ut_a(buff <= buff_start + buff_len);
+
+ DBUG_RETURN((uint)(buff - buff_start));
+}
+#endif /* WITH_WSREP */
UNIV_INTERN
uint
ha_innobase::store_key_val_for_row(
@@ -6163,6 +6626,9 @@ ha_innobase::write_row(
ulint error = 0;
int error_result= 0;
ibool auto_inc_used= FALSE;
+#ifdef WITH_WSREP
+ ibool auto_inc_inserted= FALSE; /* if NULL was inserted */
+#endif
ulint sql_command;
trx_t* trx = thd_to_trx(user_thd);
@@ -6197,8 +6663,20 @@ ha_innobase::write_row(
if ((sql_command == SQLCOM_ALTER_TABLE
|| sql_command == SQLCOM_OPTIMIZE
|| sql_command == SQLCOM_CREATE_INDEX
+#ifdef WITH_WSREP
+ || (wsrep_on(user_thd) && wsrep_load_data_splitting &&
+ sql_command == SQLCOM_LOAD &&
+ !thd_test_options(
+ user_thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+#endif /* WITH_WSREP */
|| sql_command == SQLCOM_DROP_INDEX)
&& num_write_row >= 10000) {
+#ifdef WITH_WSREP
+ if (wsrep_on(user_thd) && sql_command == SQLCOM_LOAD) {
+ WSREP_DEBUG("forced trx split for LOAD: %s",
+ wsrep_thd_query(user_thd));
+ }
+#endif /* WITH_WSREP */
/* ALTER TABLE is COMMITted at every 10000 copied rows.
The IX table lock for the original table has to be re-issued.
As this method will be called on a temporary table where the
@@ -6232,6 +6710,21 @@ no_commit:
*/
;
} else if (src_table == prebuilt->table) {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_SIZE_EXCEEDED:
+ case WSREP_TRX_CERT_FAIL:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Source table is not in InnoDB format:
no need to re-acquire locks on it. */
@@ -6242,6 +6735,20 @@ no_commit:
/* We will need an IX lock on the destination table. */
prebuilt->sql_stat_start = TRUE;
} else {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_SIZE_EXCEEDED:
+ case WSREP_TRX_CERT_FAIL:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Ensure that there are no other table locks than
LOCK_IX and LOCK_AUTO_INC on the destination table. */
@@ -6270,7 +6777,9 @@ no_commit:
/* Reset the error code before calling
innobase_get_auto_increment(). */
prebuilt->autoinc_error = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ auto_inc_inserted= (table->next_number_field->val_int() == 0);
+#endif
if ((error = update_auto_increment())) {
/* We don't want to mask autoinc overflow errors. */
@@ -6359,6 +6868,30 @@ no_commit:
case SQLCOM_REPLACE_SELECT:
goto set_max_autoinc;
+#ifdef WITH_WSREP
+ /* workaround for LP bug #355000, retrying the insert */
+ case SQLCOM_INSERT:
+ if (wsrep_on(current_thd) &&
+ auto_inc_inserted &&
+ wsrep_drupal_282555_workaround &&
+ !thd_test_options(current_thd,
+ OPTION_NOT_AUTOCOMMIT |
+ OPTION_BEGIN)) {
+ WSREP_DEBUG(
+ "retrying insert: %s",
+ (*wsrep_thd_query(current_thd)) ?
+ wsrep_thd_query(current_thd) :
+ (char *)"void");
+ error= DB_SUCCESS;
+ wsrep_thd_set_conflict_state(
+ current_thd, MUST_ABORT);
+ innodb_srv_conc_exit_innodb(prebuilt->trx);
+ /* jump straight to func exit over
+ * later wsrep hooks */
+ goto func_exit;
+ }
+ break;
+#endif
default:
break;
}
@@ -6407,6 +6940,20 @@ report_error:
error_result = convert_error_code_to_mysql((int) error,
prebuilt->table->flags,
user_thd);
+#ifdef WITH_WSREP
+ if (!error_result && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd) && !wsrep_consistency_check(user_thd) &&
+ (sql_command != SQLCOM_LOAD ||
+ thd_binlog_format(user_thd) == BINLOG_FORMAT_ROW)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ error_result = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
func_exit:
innobase_active_small();
@@ -6570,7 +7117,84 @@ calc_row_difference(
return(0);
}
+#ifdef WITH_WSREP
+static
+int
+wsrep_calc_row_hash(
+/*================*/
+ byte* digest, /*!< in/out: md5 sum */
+ const uchar* row, /*!< in: row in MySQL format */
+ TABLE* table, /*!< in: table in MySQL data
+ dictionary */
+ row_prebuilt_t* prebuilt, /*!< in: InnoDB prebuilt struct */
+ THD* thd) /*!< in: user thread */
+{
+ Field* field;
+ enum_field_types field_mysql_type;
+ uint n_fields;
+ ulint len;
+ const byte* ptr;
+ ulint col_type;
+ uint i;
+
+ my_MD5Context ctx;
+ my_MD5Init (&ctx);
+
+ n_fields = table->s->fields;
+
+ for (i = 0; i < n_fields; i++) {
+ byte null_byte=0;
+ byte true_byte=1;
+
+ field = table->field[i];
+
+ ptr = (const byte*) row + get_field_offset(table, field);
+ len = field->pack_length();
+
+ field_mysql_type = field->type();
+
+ col_type = prebuilt->table->cols[i].mtype;
+ switch (col_type) {
+
+ case DATA_BLOB:
+ ptr = row_mysql_read_blob_ref(&len, ptr, len);
+
+ break;
+
+ case DATA_VARCHAR:
+ case DATA_BINARY:
+ case DATA_VARMYSQL:
+ if (field_mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* This is a >= 5.0.3 type true VARCHAR where
+ the real payload data length is stored in
+ 1 or 2 bytes */
+
+ ptr = row_mysql_read_true_varchar(
+ &len, ptr,
+ (ulint)
+ (((Field_varstring*)field)->length_bytes));
+
+ }
+
+ break;
+ default:
+ ;
+ }
+
+ if (field->null_ptr &&
+ field_in_record_is_null(table, field, (char*) row)) {
+ my_MD5Update (&ctx, &null_byte, 1);
+ } else {
+ my_MD5Update (&ctx, &true_byte, 1);
+ my_MD5Update (&ctx, ptr, len);
+ }
+ }
+ my_MD5Final (digest, &ctx);
+
+ return(0);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Updates a row given as a parameter to a new value. Note that we are given
whole rows, not just the fields which are updated: this incurs some
@@ -6710,6 +7334,20 @@ ha_innobase::update_row(
DBUG_RETURN(HA_ERR_CRASHED);
}
+#ifdef WITH_WSREP
+ if (!error && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ DBUG_PRINT("wsrep", ("update row key"));
+
+ if (wsrep_append_keys(user_thd, false, old_row, new_row)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ error = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
DBUG_RETURN(error);
}
@@ -6767,6 +7405,18 @@ ha_innobase::delete_row(
DBUG_RETURN(HA_ERR_CRASHED);
}
+#ifdef WITH_WSREP
+ if (!error && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("delete fail"));
+ error = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif
DBUG_RETURN(error);
}
@@ -7593,7 +8243,393 @@ ha_innobase::rnd_pos(
DBUG_RETURN(error);
}
+#ifdef WITH_WSREP
+extern "C" {
+dict_index_t*
+wsrep_dict_foreign_find_index(
+ dict_table_t* table,
+ const char** columns,
+ ulint n_cols,
+ dict_index_t* types_idx,
+ ibool check_charsets,
+ ulint check_null);
+
+ulint
+wsrep_append_foreign_key(
+/*===========================*/
+ trx_t* trx, /*!< in: trx */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint */
+ const rec_t* rec, /*!<in: clustered index record */
+ dict_index_t* index, /*!<in: clustered index */
+ ibool referenced, /*!<in: is check for referenced table */
+ ibool shared) /*!<in: is shared access */
+{
+ ut_a(trx);
+ THD* thd = (THD*)trx->mysql_thd;
+ ulint rcode = DB_SUCCESS;
+ char cache_key[513] = {'\0'};
+ int cache_key_len;
+ bool const copy = true;
+
+ if (!wsrep_on(trx->mysql_thd) ||
+ wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ return DB_SUCCESS;
+
+ if (!thd || !foreign ||
+ (!foreign->referenced_table && !foreign->foreign_table))
+ {
+ WSREP_INFO("FK: %s missing in: %s",
+ (!thd) ? "thread" :
+ ((!foreign) ? "constraint" :
+ ((!foreign->referenced_table) ?
+ "referenced table" : "foreign table")),
+ (thd && wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_DEBUG("pulling %s table into cache",
+ (referenced) ? "referenced" : "foreign");
+ mutex_enter(&(dict_sys->mutex));
+ if (referenced)
+ {
+ foreign->referenced_table =
+ dict_table_get_low(
+ foreign->referenced_table_name_lookup, DICT_ERR_IGNORE_NONE);
+ if (foreign->referenced_table)
+ {
+ foreign->referenced_index =
+ wsrep_dict_foreign_find_index(
+ foreign->referenced_table,
+ foreign->referenced_col_names,
+ foreign->n_fields,
+ foreign->foreign_index,
+ TRUE, FALSE);
+ }
+ }
+ else
+ {
+ foreign->foreign_table =
+ dict_table_get_low(
+ foreign->foreign_table_name_lookup, DICT_ERR_IGNORE_NONE);
+ if (foreign->foreign_table)
+ {
+ foreign->foreign_index =
+ wsrep_dict_foreign_find_index(
+ foreign->foreign_table,
+ foreign->foreign_col_names,
+ foreign->n_fields,
+ foreign->referenced_index,
+ TRUE, FALSE);
+ }
+ }
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_WARN("FK: %s missing in query: %s",
+ (!foreign->referenced_table) ?
+ "referenced table" : "foreign table",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH;
+
+ dict_index_t *idx_target = (referenced) ?
+ foreign->referenced_index : index;
+ dict_index_t *idx = (referenced) ?
+ UT_LIST_GET_FIRST(foreign->referenced_table->indexes) :
+ UT_LIST_GET_FIRST(foreign->foreign_table->indexes);
+ int i = 0;
+ while (idx != NULL && idx != idx_target) {
+ if (innobase_strcasecmp (idx->name, innobase_index_reserve_name) != 0) {
+ i++;
+ }
+ idx = UT_LIST_GET_NEXT(indexes, idx);
+ }
+ ut_a(idx);
+ key[0] = (char)i;
+
+ rcode = wsrep_rec_get_foreign_key(
+ &key[1], &len, rec, index, idx,
+ wsrep_protocol_version > 1);
+ if (rcode != DB_SUCCESS) {
+ WSREP_ERROR(
+ "FK key set failed: %lu (%lu %lu), index: %s %s, %s",
+ rcode, referenced, shared,
+ (index && index->name) ? index->name :
+ "void index",
+ (index && index->table_name) ? index->table_name :
+ "void table",
+ wsrep_thd_query(thd));
+ return rcode;
+ }
+ strncpy(cache_key,
+ (wsrep_protocol_version > 1) ?
+ ((referenced) ?
+ foreign->referenced_table->name :
+ foreign->foreign_table->name) :
+ foreign->foreign_table->name, sizeof(cache_key) - 1);
+ cache_key_len = strlen(cache_key);
+#ifdef WSREP_DEBUG_PRINT
+ ulint j;
+ fprintf(stderr, "FK parent key, table: %s %s len: %lu ",
+ cache_key, (shared) ? "shared" : "exclusive", len+1);
+ for (j=0; j<len+1; j++) {
+ fprintf(stderr, " %hhX, ", key[j]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ char *p = strchr(cache_key, '/');
+ if (p) {
+ *p = '\0';
+ } else {
+ WSREP_WARN("unexpected foreign key table %s %s",
+ foreign->referenced_table->name,
+ foreign->foreign_table->name);
+ }
+
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)cache_key,
+ cache_key_len + 1,
+ (const uchar*)key, len+1,
+ wkey_part,
+ &wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for cascaded FK: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %lu", rcode));
+ WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ return DB_ERROR;
+ }
+
+ return DB_SUCCESS;
+}
+}
+
+static int
+wsrep_append_key(
+/*==================*/
+ THD *thd,
+ trx_t *trx,
+ TABLE_SHARE *table_share,
+ TABLE *table,
+ const char* key,
+ uint16_t key_len,
+ bool shared
+)
+{
+ DBUG_ENTER("wsrep_append_key");
+ bool const copy = true;
+#ifdef WSREP_DEBUG_PRINT
+ fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ",
+ (shared) ? "Shared" : "Exclusive",
+ wsrep_thd_thread_id(thd), trx->id, key_len,
+ table_share->table_name.str);
+ for (int i=0; i<key_len; i++) {
+ fprintf(stderr, "%hhX, ", key[i]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)table_share->table_cache_key.str,
+ table_share->table_cache_key.length,
+ (const uchar*)key, key_len,
+ wkey_part,
+ &wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+ }
+
+ int rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
+ WSREP_WARN("Appending row key failed: %s, %d",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ DBUG_RETURN(rcode);
+ }
+ DBUG_RETURN(0);
+}
+int
+ha_innobase::wsrep_append_keys(
+/*==================*/
+ THD *thd,
+ bool shared,
+ const uchar* record0, /* in: row in MySQL format */
+ const uchar* record1) /* in: row in MySQL format */
+{
+ DBUG_ENTER("wsrep_append_keys");
+
+ bool key_appended = false;
+ trx_t *trx = thd_to_trx(thd);
+
+ if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
+ WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
+ wsrep_thd_thread_id(thd),
+ table_share->tmp_table,
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(0);
+ }
+
+ if (wsrep_protocol_version == 0) {
+ uint len;
+ char keyval[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char *key = &keyval[0];
+ ibool is_null;
+
+ len = wsrep_store_key_val_for_row(
+ table, 0, key, WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record0, &is_null);
+
+ if (!is_null) {
+ int rcode = wsrep_append_key(
+ thd, trx, table_share, table, keyval,
+ len, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped (proto 0): %s",
+ wsrep_thd_query(thd));
+ }
+ } else {
+ ut_a(table->s->keys <= 256);
+ uint i;
+ bool hasPK= false;
+
+ for (i=0; i<table->s->keys; ++i) {
+ KEY* key_info = table->key_info + i;
+ if (key_info->flags & HA_NOSAME) {
+ hasPK = true;
+ if (i != table->s->primary_key) {
+ wsrep_thd_set_PA_safe(thd, FALSE);
+ }
+ }
+ }
+
+ for (i=0; i<table->s->keys; ++i) {
+ uint len;
+ char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char* key0 = &keyval0[1];
+ char* key1 = &keyval1[1];
+ KEY* key_info = table->key_info + i;
+ ibool is_null;
+
+ dict_index_t* idx = innobase_get_index(i);
+ dict_table_t* tab = (idx) ? idx->table : NULL;
+
+ keyval0[0] = (char)i;
+ keyval1[0] = (char)i;
+
+ if (!tab) {
+ WSREP_WARN("MySQL-InnoDB key mismatch %s %s",
+ table->s->table_name.str,
+ key_info->name);
+ }
+ /* !hasPK == table with no PK, must append all non-unique keys */
+ if (!hasPK || key_info->flags & HA_NOSAME ||
+ ((tab &&
+ dict_table_get_referenced_constraint(tab, idx)) ||
+ (!tab && referenced_by_foreign_key()))) {
+
+ len = wsrep_store_key_val_for_row(
+ table, i, key0,
+ WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record0, &is_null);
+ if (!is_null) {
+ int rcode = wsrep_append_key(
+ thd, trx, table_share, table,
+ keyval0, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+
+ if (key_info->flags & HA_NOSAME || shared)
+ key_appended = true;
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped: %s",
+ wsrep_thd_query(thd));
+ }
+ if (record1) {
+ len = wsrep_store_key_val_for_row(
+ table, i, key1,
+ WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record1, &is_null);
+ if (!is_null && memcmp(key0, key1, len)) {
+ int rcode = wsrep_append_key(
+ thd, trx, table_share,
+ table,
+ keyval1, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ }
+ }
+ }
+ }
+
+ /* if no PK, calculate hash of full row, to be the key value */
+ if (!key_appended && wsrep_certify_nonPK) {
+ uchar digest[16];
+ int rcode;
+
+ wsrep_calc_row_hash(digest, record0, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share, table,
+ (const char*) digest, 16,
+ shared))) {
+ DBUG_RETURN(rcode);
+ }
+
+ if (record1) {
+ wsrep_calc_row_hash(
+ digest, record1, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share,
+ table,
+ (const char*) digest,
+ 16, shared))) {
+ DBUG_RETURN(rcode);
+ }
+ }
+ DBUG_RETURN(0);
+ }
+
+ DBUG_RETURN(0);
+}
+#endif
/*********************************************************************//**
Stores a reference to the current row to 'ref' field of the handle. Note
that in the case where we have generated the clustered index for the
@@ -10571,11 +11607,18 @@ ha_innobase::external_lock(
/* used by test case */
DBUG_EXECUTE_IF("no_innodb_binlog_errors", skip = 1;);
if (!skip) {
+#ifdef WITH_WSREP
+ if (!wsrep_on(thd) || wsrep_thd_exec_mode(thd) == LOCAL_STATE)
+ {
+#endif /* WITH_WSREP */
my_error(ER_BINLOG_STMT_MODE_AND_ROW_ENGINE, MYF(0),
" InnoDB is limited to row-logging when "
"transaction isolation level is "
"READ COMMITTED or READ UNCOMMITTED.");
DBUG_RETURN(HA_ERR_LOGGING_IMPOSSIBLE);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
}
@@ -11848,6 +12891,9 @@ innobase_xa_prepare(
to the session variable take effect only in the next transaction */
if (!trx->support_xa) {
+#ifdef WITH_WSREP
+ thd_get_xid(thd, (MYSQL_XID*) &trx->xid);
+#endif // WITH_WSREP
return(0);
}
@@ -12884,6 +13930,304 @@ static SHOW_VAR innodb_status_variables_export[]= {
static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+#ifdef WITH_WSREP
+void
+wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ (long long)bf_seqno, (long long)victim_seqno);
+ abort();
+}
+
+int
+wsrep_innobase_kill_one_trx(
+ void *bf_thd_ptr, /*!< in: BF thd */
+ trx_t *bf_trx, /*!< in: BF trx */
+ trx_t *victim_trx, /*!< in: victim trx */
+ ibool signal, /*!< in: signal to be used */
+ ibool have_kernel_mutex) /*!<in: do we own kernel mutex */
+{
+ DBUG_ENTER("wsrep_innobase_kill_one_trx");
+ THD *bf_thd = (THD *)bf_thd_ptr;
+ THD *thd = (THD *) victim_trx->mysql_thd;
+ int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
+
+ if (have_kernel_mutex) {
+ ut_ad(mutex_own(&kernel_mutex));
+ }
+
+ if (!bf_thd) bf_thd = (bf_trx) ? (THD *)bf_trx->mysql_thd : NULL;
+
+ if (!thd) {
+ DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
+ WSREP_WARN("no THD for trx: %llu", victim_trx->id);
+ DBUG_RETURN(1);
+ }
+ if (!bf_thd) {
+ DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
+ WSREP_WARN("no BF THD for trx: %llu", (bf_trx) ? bf_trx->id : 0);
+ DBUG_RETURN(1);
+ }
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: %llu",
+ signal, (long long)bf_seqno,
+ wsrep_thd_thread_id(thd),
+ victim_trx->id);
+
+ WSREP_DEBUG("Aborting query: %s",
+ (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void");
+
+ wsrep_thd_LOCK(thd);
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
+ WSREP_DEBUG("kill trx EXITING for %llu", victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ }
+ if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
+ WSREP_DEBUG("withdraw for BF trx: %llu, state: %d",
+ victim_trx->id,
+ wsrep_thd_conflict_state(thd));
+ }
+
+ switch (wsrep_thd_conflict_state(thd)) {
+ case NO_CONFLICT:
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("victim %llu in MUST ABORT state",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(bf_thd, thd, signal);
+ DBUG_RETURN(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("victim %llu in state %d",
+ victim_trx->id, wsrep_thd_conflict_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd)) {
+ case QUERY_COMMITTING:
+ enum wsrep_status rcode;
+
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_awake(bf_thd, thd, signal);
+ WSREP_DEBUG("kill trx QUERY_COMMITTING for %llu",
+ victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ } else {
+ rcode = wsrep->abort_pre_commit(
+ wsrep, bf_seqno,
+ (wsrep_trx_id_t)victim_trx->id
+ );
+
+ switch (rcode) {
+ case WSREP_WARNING:
+ WSREP_DEBUG("cancel commit warning: %llu",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(1);
+ break;
+ case WSREP_OK:
+ break;
+ default:
+ WSREP_ERROR(
+ "cancel commit bad exit: %d %llu",
+ rcode,
+ victim_trx->id);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first.
+ */
+ abort();
+ break;
+ }
+ }
+ break;
+ case QUERY_EXEC:
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting
+ */
+ WSREP_DEBUG("kill trx QUERY_EXEC for %llu", victim_trx->id);
+
+ victim_trx->was_chosen_as_deadlock_victim= TRUE;
+ if (victim_trx->wait_lock) {
+ WSREP_DEBUG("victim has wait flag: %ld",
+ wsrep_thd_thread_id(thd));
+ lock_t* wait_lock = victim_trx->wait_lock;
+ if (wait_lock) {
+ WSREP_DEBUG("canceling wait lock");
+ victim_trx->was_chosen_as_deadlock_victim= TRUE;
+ if (!have_kernel_mutex) {
+ mutex_enter(&kernel_mutex);
+ }
+ lock_cancel_waiting_and_release(wait_lock);
+ /* If we already have kernel mutex when we
+ arrived to this function, do not yet release
+ it */
+ if (!have_kernel_mutex) {
+ mutex_exit(&kernel_mutex);
+ }
+ }
+
+ wsrep_thd_awake(bf_thd, thd, signal);
+ } else {
+ /* abort currently executing query */
+ DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ wsrep_thd_awake(bf_thd, thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ }
+ }
+ break;
+ case QUERY_IDLE:
+ {
+ bool skip_abort= false;
+ wsrep_aborting_thd_t abortees;
+
+ WSREP_DEBUG("kill IDLE for %llu", victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ WSREP_DEBUG("kill BF IDLE, seqno: %lld",
+ (long long)wsrep_thd_trx_seqno(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ DBUG_RETURN(0);
+ }
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+
+ abortees = wsrep_aborting_thd;
+ while (abortees && !skip_abort) {
+ /* check if we have a kill message for this already */
+ if (abortees->aborting_thd == thd) {
+ skip_abort = true;
+ WSREP_WARN("duplicate thd aborter %lu",
+ wsrep_thd_thread_id(thd));
+ }
+ abortees = abortees->next;
+ }
+ if (!skip_abort) {
+ wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
+ my_malloc(sizeof(struct wsrep_aborting_thd),
+ MYF(0));
+ aborting->aborting_thd = thd;
+ aborting->next = wsrep_aborting_thd;
+ wsrep_aborting_thd = aborting;
+ DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("enqueuing trx abort for (%lu)",
+ wsrep_thd_thread_id(thd));
+ }
+
+ DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
+ WSREP_DEBUG("signaling aborter");
+ mysql_cond_signal(&COND_wsrep_rollback);
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+
+ break;
+ }
+ default:
+ WSREP_WARN("bad wsrep query state: %d",
+ wsrep_thd_query_state(thd));
+ break;
+ }
+ wsrep_thd_UNLOCK(thd);
+
+ DBUG_RETURN(0);
+}
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal)
+{
+ DBUG_ENTER("wsrep_innobase_abort_thd");
+ trx_t* victim_trx = thd_to_trx(victim_thd);
+ trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
+
+ ut_ad(!mutex_own(&kernel_mutex));
+
+ WSREP_DEBUG("abort transaction: BF: %s victim: %s",
+ wsrep_thd_query(bf_thd),
+ wsrep_thd_query(victim_thd));
+
+ if (victim_trx)
+ {
+ int rcode = wsrep_innobase_kill_one_trx(
+ bf_thd, bf_trx, victim_trx, signal, FALSE);
+ wsrep_srv_conc_cancel_wait(victim_trx);
+ DBUG_RETURN(rcode);
+ } else {
+ WSREP_DEBUG("victim does not have transaction");
+ wsrep_thd_LOCK(victim_thd);
+ wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
+ wsrep_thd_UNLOCK(victim_thd);
+ wsrep_thd_awake(bf_thd, victim_thd, signal);
+ }
+ DBUG_RETURN(-1);
+}
+
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ if (wsrep_is_wsrep_xid(xid)) {
+ mtr_t mtr;
+ mtr_start(&mtr);
+ trx_sysf_t* sys_header = trx_sysf_get(&mtr);
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ innobase_flush_logs(hton);
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ trx_sys_read_wsrep_checkpoint(xid);
+ return 0;
+}
+
+static void
+wsrep_fake_trx_id(
+/*==================*/
+ handlerton *hton,
+ THD *thd) /*!< in: user thread handle */
+{
+ mutex_enter(&kernel_mutex);
+ trx_id_t trx_id = trx_sys_get_new_trx_id();
+ mutex_exit(&kernel_mutex);
+
+ (void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
+}
+
+#endif /* WITH_WSREP */
/* plugin options */
static MYSQL_SYSVAR_BOOL(checksums, innobase_use_checksums,
PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY,
@@ -13378,6 +14722,40 @@ static MYSQL_SYSVAR_UINT(change_buffering_debug, ibuf_debug,
NULL, NULL, 0, 0, 2, 0);
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/*******************************************************
+ * innobase_disallow_writes variable definition *
+ *******************************************************/
+
+/* Must always init to FALSE. */
+static my_bool innobase_disallow_writes = FALSE;
+
+/**************************************************************************
+An "update" method for innobase_disallow_writes variable. */
+static
+void
+innobase_disallow_writes_update(
+/*============================*/
+ THD* thd, /* in: thread handle */
+ st_mysql_sys_var* var, /* in: pointer to system
+ variable */
+ void* var_ptr, /* out: pointer to dynamic
+ variable */
+ const void* save) /* in: temporary storage */
+{
+ *(my_bool*)var_ptr = *(my_bool*)save;
+ ut_a(srv_allow_writes_event);
+ if (*(my_bool*)var_ptr)
+ os_event_reset(srv_allow_writes_event);
+ else
+ os_event_set(srv_allow_writes_event);
+}
+
+static MYSQL_SYSVAR_BOOL(disallow_writes, innobase_disallow_writes,
+ PLUGIN_VAR_NOCMDOPT,
+ "Tell InnoDB to stop any writes to disk",
+ NULL, innobase_disallow_writes_update, FALSE);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
static MYSQL_SYSVAR_BOOL(random_read_ahead, srv_random_read_ahead,
PLUGIN_VAR_NOCMDARG,
"Whether to use read ahead for random access within an extent.",
@@ -13715,6 +15093,9 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
MYSQL_SYSVAR(change_buffering_debug),
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ MYSQL_SYSVAR(disallow_writes),
+#endif /* WITH_INNODB_DISALLOW_WRITES */
MYSQL_SYSVAR(random_read_ahead),
MYSQL_SYSVAR(read_ahead_threshold),
MYSQL_SYSVAR(io_capacity),
diff --git a/storage/xtradb/handler/ha_innodb.h b/storage/xtradb/handler/ha_innodb.h
index 39ca89cf6e8..ca764d46c2b 100644
--- a/storage/xtradb/handler/ha_innodb.h
+++ b/storage/xtradb/handler/ha_innodb.h
@@ -115,6 +115,10 @@ class ha_innobase: public handler
dict_index_t* innobase_get_index(uint keynr);
int info_low(uint flag, bool called_from_analyze);
+#ifdef WITH_WSREP
+ int wsrep_append_keys(THD *thd, bool shared,
+ const uchar* record0, const uchar* record1);
+#endif
/* Init values for the class: */
public:
ha_innobase(handlerton *hton, TABLE_SHARE *table_arg);
@@ -372,6 +376,40 @@ bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd);
*/
extern void mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file);
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+//extern "C" int wsrep_trx_order_before(void *thd1, void *thd2);
+
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd);
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd);
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd);
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
+extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
+extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
+extern "C" const char * wsrep_thd_query_state_str(THD *thd);
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state);
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state);
+
+extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id);
+
+extern "C"void wsrep_thd_LOCK(THD *thd);
+extern "C"void wsrep_thd_UNLOCK(THD *thd);
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
+extern "C" time_t wsrep_thd_query_start(THD *thd);
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
+extern "C" int64_t wsrep_thd_trx_seqno(THD *thd);
+extern "C" query_id_t wsrep_thd_query_id(THD *thd);
+extern "C" char * wsrep_thd_query(THD *thd);
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
+extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);
+#endif
typedef struct trx_struct trx_t;
/********************************************************************//**
@file handler/ha_innodb.h
@@ -412,3 +450,6 @@ innobase_index_name_is_reserved(
ulint num_of_keys); /*!< in: Number of indexes to
be created. */
+#ifdef WITH_WSREP
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
diff --git a/storage/xtradb/handler/handler0alter.cc b/storage/xtradb/handler/handler0alter.cc
index 2b2b2b536a8..a483860097b 100644
--- a/storage/xtradb/handler/handler0alter.cc
+++ b/storage/xtradb/handler/handler0alter.cc
@@ -37,6 +37,10 @@ extern "C" {
#include "handler0alter.h"
}
+#ifdef WITH_WSREP
+//#include "wsrep_api.h"
+#include <sql_acl.h> // PROCESS_ACL
+#endif
#include "ha_innodb.h"
/*************************************************************//**
diff --git a/storage/xtradb/include/dict0mem.h b/storage/xtradb/include/dict0mem.h
index ba98eed7701..f174103c3fe 100644
--- a/storage/xtradb/include/dict0mem.h
+++ b/storage/xtradb/include/dict0mem.h
@@ -344,6 +344,9 @@ barracuda format, the length could be REC_VERSION_56_MAX_INDEX_COL_LEN
/** Defines the maximum fixed length column size */
#define DICT_MAX_FIXED_COL_LEN DICT_ANTELOPE_MAX_INDEX_COL_LEN
+#ifdef WITH_WSREP
+#define WSREP_MAX_SUPPORTED_KEY_LENGTH 3500
+#endif /* WITH_WSREP */
/** Data structure for a field in an index */
struct dict_field_struct{
diff --git a/storage/xtradb/include/ha_prototypes.h b/storage/xtradb/include/ha_prototypes.h
index ec2ba77f784..60d792fc5f1 100644
--- a/storage/xtradb/include/ha_prototypes.h
+++ b/storage/xtradb/include/ha_prototypes.h
@@ -300,6 +300,19 @@ thd_flush_log_at_trx_commit(
/*================================*/
void* thd);
+#ifdef WITH_WSREP
+UNIV_INTERN int wsrep_innobase_kill_one_trx(void *thd, trx_t *bf_trx, trx_t *victim_trx,
+ ibool signal, ibool have_kernel_mutex);
+my_bool wsrep_thd_is_BF(void *thd_ptr, my_bool sync);
+//int64_t wsrep_thd_trx_seqno(THD *thd);
+int wsrep_trx_order_before(void *thd1, void *thd2);
+int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
+ unsigned char* str, unsigned int str_length,
+ unsigned int buf_length);
+int wsrep_on(void *thd_ptr);
+int wsrep_is_wsrep_xid(const void*);
+my_bool wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe);
+#endif /* WITH_WSREP */
/**********************************************************************//**
Get the current setting of the lower_case_table_names global parameter from
mysqld.cc. We do a dirty read because for one there is no synchronization
diff --git a/storage/xtradb/include/lock0lock.h b/storage/xtradb/include/lock0lock.h
index c6684f36300..a196c51651f 100644
--- a/storage/xtradb/include/lock0lock.h
+++ b/storage/xtradb/include/lock0lock.h
@@ -810,9 +810,6 @@ lock_rec_get_page_no(
record */
#define LOCK_CONV_BY_OTHER 4096 /*!< this bit is set when the lock is created
by other transaction */
-#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_MODE_MASK
-# error
-#endif
#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_TYPE_MASK
# error
#endif
diff --git a/storage/xtradb/include/rem0rec.h b/storage/xtradb/include/rem0rec.h
index 9dd96f609ea..7e76b4f40cb 100644
--- a/storage/xtradb/include/rem0rec.h
+++ b/storage/xtradb/include/rem0rec.h
@@ -836,6 +836,15 @@ are given in one byte (resp. two byte) format. */
two upmost bits in a two byte offset for special purposes */
#define REC_MAX_DATA_SIZE (16 * 1024)
+#ifdef WITH_WSREP
+int wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index for foreign table */
+ dict_index_t* index_ref, /* in: index for referenced table */
+ ibool new_protocol); /* in: protocol > 1 */
+#endif /* WITH_WSREP */
#ifndef UNIV_NONINL
#include "rem0rec.ic"
#endif
diff --git a/storage/xtradb/include/srv0srv.h b/storage/xtradb/include/srv0srv.h
index 5d0d46d1b1a..34c5fa7ed34 100644
--- a/storage/xtradb/include/srv0srv.h
+++ b/storage/xtradb/include/srv0srv.h
@@ -167,6 +167,10 @@ extern ulint srv_log_buffer_size;
extern char srv_use_global_flush_log_at_trx_commit;
extern char srv_adaptive_flushing;
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/* When this event is reset we do not allow any file writes to take place. */
+extern os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
/* If this flag is TRUE, then we will load the indexes' (and tables') metadata
even if they are marked as "corrupted". Mostly it is for DBA to process
corrupted index and table */
@@ -665,6 +669,14 @@ srv_conc_enter_innodb(
/*==================*/
trx_t* trx); /*!< in: transaction object associated with the
thread */
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx); /*!< in: transaction object associated with the
+ thread */
+#endif /* WITH_WSREP */
/*********************************************************************//**
This lets a thread enter InnoDB regardless of the number of threads inside
InnoDB. This must be called when a thread ends a lock wait. */
diff --git a/storage/xtradb/include/trx0sys.h b/storage/xtradb/include/trx0sys.h
index f284790630c..69457a4c9b1 100644
--- a/storage/xtradb/include/trx0sys.h
+++ b/storage/xtradb/include/trx0sys.h
@@ -41,6 +41,9 @@ Created 3/26/1996 Heikki Tuuri
#include "ut0bh.h"
#include "read0types.h"
#include "page0types.h"
+#ifdef WITH_WSREP
+#include "trx0xa.h"
+#endif /* WITH_WSREP */
/** In a MySQL replication slave, in crash recovery we store the master log
file name and position here. */
@@ -352,6 +355,19 @@ UNIV_INTERN
void
trx_sys_print_mysql_binlog_offset(void);
/*===================================*/
+#ifdef WITH_WSREP
+/** Update WSREP checkpoint XID in sys header. */
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: WSREP XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr); /*!< in: mtr */
+
+void
+/** Read WSREP checkpoint XID from sys header. */
+trx_sys_read_wsrep_checkpoint(
+ XID* xid); /*!< out: WSREP XID */
+#endif /* WITH_WSREP */
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header
COMMIT set of fields if the magic number shows it valid and stores it
@@ -552,6 +568,22 @@ We must remember this limit in order to keep file compatibility. */
//#if UNIV_PAGE_SIZE < 4096
//# error "UNIV_PAGE_SIZE < 4096"
//#endif
+
+
+/* The offset to WSREP XID headers */
+#ifdef WITH_WSREP
+#define TRX_SYS_WSREP_XID_INFO (UNIV_PAGE_SIZE - 3500)
+#define TRX_SYS_WSREP_XID_MAGIC_N_FLD 0
+#define TRX_SYS_WSREP_XID_MAGIC_N 0x77737265
+
+/* XID field: formatID, gtrid_len, bqual_len, xid_data */
+#define TRX_SYS_WSREP_XID_LEN (4 + 4 + 4 + XIDDATASIZE)
+#define TRX_SYS_WSREP_XID_FORMAT 4
+#define TRX_SYS_WSREP_XID_GTRID_LEN 8
+#define TRX_SYS_WSREP_XID_BQUAL_LEN 12
+#define TRX_SYS_WSREP_XID_DATA 16
+#endif /* WITH_WSREP*/
+
/** The offset of the MySQL replication info in the trx system header;
this contains the same fields as TRX_SYS_MYSQL_LOG_INFO below. These are
written at prepare time and are the main copy. */
diff --git a/storage/xtradb/include/trx0trx.h b/storage/xtradb/include/trx0trx.h
index eac5eb87703..a3774c1d348 100644
--- a/storage/xtradb/include/trx0trx.h
+++ b/storage/xtradb/include/trx0trx.h
@@ -778,6 +778,9 @@ struct trx_struct{
/*------------------------------*/
char detailed_error[256]; /*!< detailed error message for last
error, or empty. */
+#ifdef WITH_WSREP
+ os_event_t wsrep_event; /* event waited for in srv_conc_slot */
+#endif /* WITH_WSREP */
/*------------------------------*/
ulint io_reads;
ib_uint64_t io_read;
diff --git a/storage/xtradb/lock/lock0lock.c b/storage/xtradb/lock/lock0lock.c
index 7330b066515..d9a5768d94c 100644
--- a/storage/xtradb/lock/lock0lock.c
+++ b/storage/xtradb/lock/lock0lock.c
@@ -40,6 +40,10 @@ Created 5/7/1996 Heikki Tuuri
#include "trx0sys.h"
#include "btr0btr.h"
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+extern my_bool wsrep_log_conflicts;
+#endif
/* Restricts the length of search we will do in the waits-for
graph of transactions */
#define LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK 1000000
@@ -906,6 +910,9 @@ UNIV_INLINE
ibool
lock_rec_has_to_wait(
/*=================*/
+#ifdef WITH_WSREP
+ ibool for_locking, /*!< is caller locking or releasing */
+#endif /* WITH_WSREP */
const trx_t* trx, /*!< in: trx of new lock */
ulint type_mode,/*!< in: precise mode of the new lock
to set: LOCK_S or LOCK_X, possibly
@@ -975,6 +982,44 @@ lock_rec_has_to_wait(
return(FALSE);
}
+#ifdef WITH_WSREP
+ /* if BF thread is locking and has conflict with another BF
+ thread, we need to look at trx ordering and lock types */
+ if (for_locking &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE) &&
+ wsrep_thd_is_BF(lock2->trx->mysql_thd, TRUE)) {
+
+ if (wsrep_debug) {
+ fprintf(stderr, "\n BF-BF lock conflict \n");
+ lock_rec_print(stderr, lock2);
+ }
+
+ if (wsrep_trx_order_before(trx->mysql_thd,
+ lock2->trx->mysql_thd) &&
+ (type_mode & LOCK_MODE_MASK) == LOCK_X &&
+ (lock2->type_mode & LOCK_MODE_MASK) == LOCK_X)
+ {
+ /* exclusive lock conflicts are not accepted */
+ fprintf(stderr, "BF-BF X lock conflict\n");
+ lock_rec_print(stderr, lock2);
+ abort();
+ } else {
+ /* if lock2->index->n_uniq <=
+ lock2->index->n_user_defined_cols
+ operation is on uniq index
+ */
+ if (wsrep_debug) fprintf(stderr,
+ "BF conflict, modes: %lu %lu, "
+ "idx: %s-%s n_uniq %u n_user %u\n",
+ type_mode, lock2->type_mode,
+ lock2->index->name,
+ lock2->index->table_name,
+ lock2->index->n_uniq,
+ lock2->index->n_user_defined_cols);
+ return FALSE;
+ }
+ }
+#endif /* WITH_WSREP */
return(TRUE);
}
@@ -1005,7 +1050,11 @@ lock_has_to_wait(
/* If this lock request is for a supremum record
then the second bit on the lock bitmap is set */
+#ifdef WITH_WSREP
+ return(lock_rec_has_to_wait(FALSE, lock1->trx,
+#else
return(lock_rec_has_to_wait(lock1->trx,
+#endif /* WITH_WSREP */
lock1->type_mode, lock2,
lock_rec_get_nth_bit(
lock1, 1)));
@@ -1455,6 +1504,11 @@ lock_rec_has_expl(
return(NULL);
}
+#ifdef WITH_WSREP
+static
+void
+lock_rec_discard(lock_t* in_lock);
+#endif
#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
@@ -1504,6 +1558,53 @@ lock_rec_other_has_expl_req(
}
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+static void
+wsrep_kill_victim(trx_t *trx, lock_t *lock) {
+ my_bool bf_this = wsrep_thd_is_BF(trx->mysql_thd, FALSE);
+ my_bool bf_other = wsrep_thd_is_BF(lock->trx->mysql_thd, TRUE);
+ if ((bf_this && !bf_other) ||
+ (bf_this && bf_other && wsrep_trx_order_before(
+ trx->mysql_thd, lock->trx->mysql_thd))) {
+
+ if (lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: BF victim waiting\n");
+ /* cannot release lock, until our lock
+ is in the queue*/
+ } else if (lock->trx != trx) {
+ if (wsrep_log_conflicts) {
+ if (bf_this)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print(stderr, trx, 3000);
+
+ if (bf_other)
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ else
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ trx_print(stderr, lock->trx, 3000);
+
+ fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
+ stderr);
+
+ if (lock_get_type(lock) == LOCK_REC) {
+ lock_rec_print(stderr, lock);
+ } else {
+ lock_table_print(stderr, lock);
+ }
+ }
+ wsrep_innobase_kill_one_trx(
+ trx->mysql_thd, trx, lock->trx, TRUE, TRUE);
+ }
+ }
+}
+#endif
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
@@ -1531,8 +1632,15 @@ lock_rec_other_has_conflicting(
if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
do {
- if (lock_rec_has_to_wait(trx, mode, lock,
+#ifdef WITH_WSREP
+ if (lock_rec_has_to_wait(TRUE, trx, mode, lock,
+#else
+ if (lock_rec_has_to_wait(trx, mode, lock,
+#endif /* WITH_WSREP */
TRUE)) {
+#ifdef WITH_WSREP
+ wsrep_kill_victim(trx, lock);
+#endif
return(lock);
}
@@ -1541,8 +1649,15 @@ lock_rec_other_has_conflicting(
} else {
do {
- if (lock_rec_has_to_wait(trx, mode, lock,
+#ifdef WITH_WSREP
+ if (lock_rec_has_to_wait(TRUE, trx, mode, lock,
+#else
+ if (lock_rec_has_to_wait(trx, mode, lock,
+#endif /* WITH_WSREP */
FALSE)) {
+#ifdef WITH_WSREP
+ wsrep_kill_victim(trx, lock);
+#endif
return(lock);
}
@@ -1674,6 +1789,9 @@ static
lock_t*
lock_rec_create(
/*============*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint type_mode,/*!< in: lock mode and wait
flag, type is ignored and
replaced by LOCK_REC */
@@ -1733,9 +1851,66 @@ lock_rec_create(
/* Set the bit corresponding to rec */
lock_rec_set_nth_bit(lock, heap_no);
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ lock_t *hash = c_lock->hash;
+ lock_t *prev = NULL;
+
+ while (hash &&
+ wsrep_thd_is_BF(hash->trx->mysql_thd, TRUE) &&
+ wsrep_trx_order_before(
+ hash->trx->mysql_thd, trx->mysql_thd))
+ {
+ prev = hash;
+ hash = hash->hash;
+ }
+ lock->hash = hash;
+ if (prev) {
+ prev->hash = lock;
+ } else {
+ c_lock->hash = lock;
+ }
+ /*
+ * delayed conflict resolution '...kill_one_trx' was not called,
+ * if victim was waiting for some other lock
+ */
+ if (c_lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
+ c_lock->trx->was_chosen_as_deadlock_victim = TRUE;
+
+ if (wsrep_debug && c_lock->trx->wait_lock != c_lock) {
+ fprintf(stderr, "WSREP: c_lock != wait lock\n");
+ lock_rec_print(stderr, c_lock);
+ lock_rec_print(stderr, c_lock->trx->wait_lock);
+ }
+
+ trx->que_state = TRX_QUE_LOCK_WAIT;
+ lock_set_lock_and_trx_wait(lock, trx);
+
+ lock_cancel_waiting_and_release(c_lock->trx->wait_lock);
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+
+ if (wsrep_debug) fprintf(
+ stderr,
+ "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+
+ /* have to bail out here to avoid lock_set_lock... */
+ return(lock);
+ }
+ } else {
+ HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
+ lock_rec_fold(space, page_no), lock);
+ }
+#else
HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock);
-
+#endif /* WITH_WSREP */
lock_sys->rec_num++;
if (lock_is_wait_not_by_other(type_mode)) {
lock_set_lock_and_trx_wait(lock, trx);
@@ -1755,6 +1930,9 @@ static
enum db_err
lock_rec_enqueue_waiting(
/*=====================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint type_mode,/*!< in: lock mode this
transaction is requesting:
LOCK_S or LOCK_X, possibly
@@ -1809,9 +1987,18 @@ lock_rec_enqueue_waiting(
}
if (lock == NULL) {
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) && trx->was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ /* Enqueue the lock request that will wait to be granted */
+ lock = lock_rec_create(c_lock, type_mode | LOCK_WAIT,
+ block, heap_no, index, trx);
+#else
/* Enqueue the lock request that will wait to be granted */
lock = lock_rec_create(type_mode | LOCK_WAIT,
block, heap_no, index, trx);
+#endif /*WITH_WSREP */
} else {
ut_ad(lock->type_mode & LOCK_WAIT);
ut_ad(lock->type_mode & LOCK_CONV_BY_OTHER);
@@ -1900,7 +2087,19 @@ lock_rec_add_to_queue(
lock_t* other_lock
= lock_rec_other_has_expl_req(mode, 0, LOCK_WAIT,
block, heap_no, trx);
+#ifdef WITH_WSREP
+ /* this can potentionally assert with wsrep */
+ if (wsrep_on(trx->mysql_thd)) {
+ if (wsrep_debug && other_lock) {
+ fprintf(stderr,
+ "WSREP: InnoDB assert ignored\n");
+ }
+ } else {
+ ut_a(!other_lock);
+ }
+#else
ut_a(!other_lock);
+#endif /* WITH_WSREP */
}
#endif /* UNIV_DEBUG */
@@ -1953,7 +2152,11 @@ lock_rec_add_to_queue(
}
somebody_waits:
+#ifdef WITH_WSREP
+ return(lock_rec_create(NULL, type_mode, block, heap_no, index, trx));
+#else
return(lock_rec_create(type_mode, block, heap_no, index, trx));
+#endif
}
/** Record locking request status */
@@ -2013,7 +2216,11 @@ lock_rec_lock_fast(
if (lock == NULL) {
if (!impl) {
+#ifdef WITH_WSREP
+ lock_rec_create(NULL, mode, block, heap_no, index, trx);
+#else
lock_rec_create(mode, block, heap_no, index, trx);
+#endif
}
return(LOCK_REC_SUCCESS_CREATED);
@@ -2069,6 +2276,9 @@ lock_rec_lock_slow(
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
+#ifdef WITH_WSREP
+ lock_t* c_lock = NULL;
+#endif
lock_t* lock;
ut_ad(mutex_own(&kernel_mutex));
@@ -2110,7 +2320,12 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
+#ifdef WITH_WSREP
+ } else if ((c_lock = lock_rec_other_has_conflicting(
+ mode, block, heap_no, trx))) {
+#else
} else if (lock_rec_other_has_conflicting(mode, block, heap_no, trx)) {
+#endif /* WITH_WSREP */
/* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong
@@ -2118,8 +2333,16 @@ lock_rec_lock_slow(
ut_ad(lock == NULL);
enqueue_waiting:
+#ifdef WITH_WSREP
+ /* c_lock is NULL here if jump to enqueue_waiting happened
+ but it's ok because lock is not NULL in that case and c_lock
+ is not used. */
+ return(lock_rec_enqueue_waiting(c_lock, mode, block, heap_no,
+ lock, index, thr));
+#else
return(lock_rec_enqueue_waiting(mode, block, heap_no,
lock, index, thr));
+#endif /* WITH_WSREP */
} else if (!impl) {
/* Set the requested lock on the record */
@@ -2166,7 +2389,6 @@ lock_rec_lock(
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0);
-
/* We try a simplified and faster subroutine for the most
common cases */
switch (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
@@ -3601,6 +3823,26 @@ lock_deadlock_recursive(
stderr);
}
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+ if (wsrep_debug)
+ fputs("WSREP: Deadlock detected\n", stderr);
+ if ((start != wait_lock->trx) &&
+ wsrep_thd_is_BF(start->mysql_thd, TRUE) &&
+ wsrep_thd_is_BF(
+ wait_lock->trx->mysql_thd, TRUE))
+ {
+ if (wsrep_trx_order_before(
+ start->mysql_thd,
+ wait_lock->trx->mysql_thd)) {
+
+ wait_lock->trx->was_chosen_as_deadlock_victim = TRUE;
+ lock_cancel_waiting_and_release(wait_lock);
+ return(LOCK_VICTIM_IS_OTHER);
+ } else {
+ return(LOCK_VICTIM_IS_START);
+ }
+ }
+#endif
if (trx_weight_ge(wait_lock->trx, start)) {
/* Our recursion starting point
@@ -3608,8 +3850,23 @@ lock_deadlock_recursive(
choose 'start' as the victim and roll
back it */
+#ifdef WITH_WSREP
+ if (!wsrep_thd_is_BF(
+ start->mysql_thd, FALSE))
+ {
+ return(LOCK_VICTIM_IS_START);
+ }
+#else
+ return(LOCK_VICTIM_IS_START);
+#endif
+ }
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(
+ wait_lock->trx->mysql_thd, TRUE))
+ {
return(LOCK_VICTIM_IS_START);
}
+#endif
lock_deadlock_found = TRUE;
@@ -3694,6 +3951,9 @@ UNIV_INLINE
lock_t*
lock_table_create(
/*==============*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
dict_table_t* table, /*!< in: database table in dictionary cache */
ulint type_mode,/*!< in: lock mode possibly ORed with
LOCK_WAIT */
@@ -3730,7 +3990,40 @@ lock_table_create(
lock->un_member.tab_lock.table = table;
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ UT_LIST_INSERT_AFTER(
+ un_member.tab_lock.locks, table->locks, c_lock, lock);
+ } else {
+ UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+ }
+
+ if (c_lock && c_lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug) {
+ fprintf(stderr,
+ "WSREP: table c_lock in wait: %llu new loc: %llu\n",
+ (ulonglong) c_lock->trx->id, lock->trx->id);
+ }
+
+ c_lock->trx->was_chosen_as_deadlock_victim = TRUE;
+ lock_cancel_waiting_and_release(c_lock->trx->wait_lock);
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+
+ if (wsrep_debug) {
+ fprintf(stderr, "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+ }
+ }
+
+#else
UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+#endif
if (UNIV_UNLIKELY(type_mode & LOCK_WAIT)) {
@@ -3878,6 +4171,9 @@ static
ulint
lock_table_enqueue_waiting(
/*=======================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint mode, /*!< in: lock mode this transaction is
requesting */
dict_table_t* table, /*!< in: table */
@@ -3921,7 +4217,14 @@ lock_table_enqueue_waiting(
/* Enqueue the lock request that will wait to be granted */
+#ifdef WITH_WSREP
+ if (trx->was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ lock = lock_table_create(c_lock, table, mode | LOCK_WAIT, trx);
+#else
lock = lock_table_create(table, mode | LOCK_WAIT, trx);
+#endif
/* Check if a deadlock occurs: if yes, remove the lock request and
return an error code */
@@ -3983,7 +4286,11 @@ lock_table_other_has_incompatible(
if ((lock->trx != trx)
&& (!lock_mode_compatible(lock_get_mode(lock), mode))
&& (wait || !(lock_get_wait(lock)))) {
-
+#ifdef WITH_WSREP
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: table lock abort");
+ wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
+#endif
return(lock);
}
@@ -4007,6 +4314,9 @@ lock_table(
enum lock_mode mode, /*!< in: lock mode */
que_thr_t* thr) /*!< in: query thread */
{
+#ifdef WITH_WSREP
+ lock_t *c_lock;
+#endif
trx_t* trx;
ulint err;
@@ -4039,19 +4349,32 @@ lock_table(
/* We have to check if the new lock is compatible with any locks
other transactions have in the table lock queue. */
+#ifdef WITH_WSREP
+ if ((c_lock = (lock_t *)lock_table_other_has_incompatible(
+ trx, LOCK_WAIT, table, mode))) {
+#else
if (lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode)) {
+#endif
/* Another trx has a request on the table in an incompatible
mode: this trx may have to wait */
+#ifdef WITH_WSREP
+ err = lock_table_enqueue_waiting(c_lock, mode | flags, table, thr);
+#else
err = lock_table_enqueue_waiting(mode | flags, table, thr);
+#endif
lock_mutex_exit_kernel();
return(err);
}
+#ifdef WITH_WSREP
+ lock_table_create(c_lock, table, mode | flags, trx);
+#else
lock_table_create(table, mode | flags, trx);
+#endif
ut_a(!flags || mode == LOCK_S || mode == LOCK_X);
@@ -4975,6 +5298,7 @@ lock_rec_queue_validate(
if (!lock_rec_get_gap(lock) && !lock_get_wait(lock)) {
+#ifndef WITH_WSREP
enum lock_mode mode;
if (lock_get_mode(lock) == LOCK_S) {
@@ -4984,6 +5308,7 @@ lock_rec_queue_validate(
}
ut_a(!lock_rec_other_has_expl_req(
mode, 0, 0, block, heap_no, lock->trx));
+#endif /* WITH_WSREP */
} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
@@ -5260,6 +5585,9 @@ lock_rec_insert_check_and_lock(
lock_t* lock;
ulint err;
ulint next_rec_heap_no;
+#ifdef WITH_WSREP
+ lock_t *c_lock;
+#endif
ut_ad(block->frame == page_align(rec));
@@ -5317,15 +5645,28 @@ lock_rec_insert_check_and_lock(
had to wait for their insert. Both had waiting gap type lock requests
on the successor, which produced an unnecessary deadlock. */
+#ifdef WITH_WSREP
+ if ((c_lock = lock_rec_other_has_conflicting(
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
+ block, next_rec_heap_no, trx))) {
+#else
if (lock_rec_other_has_conflicting(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
block, next_rec_heap_no, trx)) {
+#endif
/* Note that we may get DB_SUCCESS also here! */
+#ifdef WITH_WSREP
+ err = lock_rec_enqueue_waiting(c_lock, LOCK_X | LOCK_GAP
+ | LOCK_INSERT_INTENTION,
+ block, next_rec_heap_no,
+ NULL, index, thr);
+#else
err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP
| LOCK_INSERT_INTENTION,
block, next_rec_heap_no,
NULL, index, thr);
+#endif /* WITH_WSREP */
} else {
err = DB_SUCCESS;
}
@@ -5409,6 +5750,11 @@ lock_rec_convert_impl_to_expl(
implicit lock. Because cannot lock at this moment.*/
if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))
+#ifdef WITH_WSREP
+ && !wsrep_thd_is_BF(impl_trx->mysql_thd, FALSE)
+ /* BF-BF conflict is possible if advancing into
+ lock_rec_other_has_conflicting*/
+#endif /* WITH_WSREP */
&& lock_rec_other_has_conflicting(
LOCK_X | LOCK_REC_NOT_GAP, block,
heap_no, impl_trx)) {
diff --git a/storage/xtradb/os/os0file.c b/storage/xtradb/os/os0file.c
index e33f086c949..c2c04f8c7a5 100644
--- a/storage/xtradb/os/os0file.c
+++ b/storage/xtradb/os/os0file.c
@@ -103,6 +103,12 @@ UNIV_INTERN os_mutex_t os_file_seek_mutexes[OS_FILE_N_SEEK_MUTEXES];
/* In simulated aio, merge at most this many consecutive i/os */
#define OS_AIO_MERGE_N_CONSECUTIVE 64
+#ifdef WITH_INNODB_DISALLOW_WRITES
+#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event)
+#else
+#define WAIT_ALLOW_WRITES() do { } while (0)
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/**********************************************************************
InnoDB AIO Implementation:
@@ -826,7 +832,9 @@ os_file_create_tmpfile(void)
/*========================*/
{
FILE* file = NULL;
- int fd = innobase_mysql_tmpfile();
+ int fd;
+ WAIT_ALLOW_WRITES();
+ fd = innobase_mysql_tmpfile();
if (fd >= 0) {
file = fdopen(fd, "w+b");
@@ -1145,6 +1153,7 @@ os_file_create_directory(
return (TRUE);
#else
int rcode;
+ WAIT_ALLOW_WRITES();
rcode = mkdir(pathname, 0770);
@@ -1246,6 +1255,8 @@ try_again:
os_file_t file;
int create_flag;
ibool retry;
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
try_again:
ut_a(name);
@@ -1378,6 +1389,8 @@ os_file_create_simple_no_error_handling_func(
int create_flag;
ut_a(name);
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
if (create_mode == OS_FILE_OPEN) {
if (access_type == OS_FILE_READ_ONLY) {
@@ -1671,6 +1684,8 @@ try_again:
int create_flag;
ibool retry;
const char* mode_str = NULL;
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
DBUG_EXECUTE_IF(
"ib_create_table_fail_disk_full",
@@ -1846,6 +1861,7 @@ loop:
goto loop;
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = unlink(name);
@@ -1909,6 +1925,7 @@ loop:
goto loop;
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = unlink(name);
@@ -1949,6 +1966,7 @@ os_file_rename_func(
return(FALSE);
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = rename(oldpath, newpath);
@@ -2226,6 +2244,7 @@ os_file_set_eof(
HANDLE h = (HANDLE) _get_osfhandle(fileno(file));
return(SetEndOfFile(h));
#else /* __WIN__ */
+ WAIT_ALLOW_WRITES();
return(!ftruncate(fileno(file), ftell(file)));
#endif /* __WIN__ */
}
@@ -2355,6 +2374,7 @@ os_file_flush_func(
return(FALSE);
#else
int ret;
+ WAIT_ALLOW_WRITES();
#if defined(HAVE_DARWIN_THREADS)
# ifndef F_FULLFSYNC
@@ -3032,6 +3052,7 @@ retry:
return(FALSE);
#else
ssize_t ret;
+ WAIT_ALLOW_WRITES();
ret = os_file_pwrite(file, buf, n, offset, offset_high);
diff --git a/storage/xtradb/os/os0proc.c b/storage/xtradb/os/os0proc.c
index 7b52dd2a28f..535bb308e34 100644
--- a/storage/xtradb/os/os0proc.c
+++ b/storage/xtradb/os/os0proc.c
@@ -57,6 +57,9 @@ UNIV_INTERN ibool os_use_large_pages;
/* Large page size. This may be a boot-time option on some platforms */
UNIV_INTERN ulint os_large_page_size;
+#ifdef WITH_WSREP
+extern my_bool wsrep_recovery;
+#endif /* WITH_WSREP */
/****************************************************************//**
Converts the current process id to a number. It is not guaranteed that the
number is unique. In Linux returns the 'process number' of the current
@@ -183,6 +186,12 @@ skip:
# else
size = UNIV_PAGE_SIZE;
# endif
+#ifdef WITH_WSREP
+ /* Don't populate if wsrep_recovery is ON */
+ if (wsrep_recovery) {
+ populate = FALSE;
+ }
+#endif /* WITH_WSREP */
/* Align block size to system page size */
ut_ad(ut_is_2pow(size));
size = *n = ut_2pow_round(*n + (size - 1), size);
diff --git a/storage/xtradb/rem/rem0rec.c b/storage/xtradb/rem/rem0rec.c
index d938aa696dd..cb4e8ca1cb1 100644
--- a/storage/xtradb/rem/rem0rec.c
+++ b/storage/xtradb/rem/rem0rec.c
@@ -31,6 +31,9 @@ Created 5/30/1994 Heikki Tuuri
#include "mtr0mtr.h"
#include "mtr0log.h"
+#ifdef WITH_WSREP
+#include <ha_prototypes.h>
+#endif /* WITH_WSREP */
/* PHYSICAL RECORD (OLD STYLE)
===========================
@@ -1897,3 +1900,133 @@ rec_print(
}
}
#endif /* !UNIV_HOTBACKUP */
+#ifdef WITH_WSREP
+int
+wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index in foreign table */
+ dict_index_t* index_ref, /* in: index in referenced table */
+ ibool new_protocol) /* in: protocol > 1 */
+{
+ const byte* data;
+ ulint len;
+ ulint key_len = 0;
+ ulint i;
+ uint key_parts;
+ mem_heap_t* heap = NULL;
+ ulint offsets_[REC_OFFS_NORMAL_SIZE];
+ const ulint* offsets;
+
+ ut_ad(index_for);
+ ut_ad(index_ref);
+
+ rec_offs_init(offsets_);
+ offsets = rec_get_offsets(rec, index_for, offsets_,
+ ULINT_UNDEFINED, &heap);
+
+ ut_ad(rec_offs_validate(rec, NULL, offsets));
+
+ ut_ad(rec);
+
+ key_parts = dict_index_get_n_unique_in_tree(index_for);
+ for (i = 0;
+ i < key_parts &&
+ (index_for->type & DICT_CLUSTERED || i < key_parts - 1);
+ i++) {
+ dict_field_t* field_f =
+ dict_index_get_nth_field(index_for, i);
+ const dict_col_t* col_f = dict_field_get_col(field_f);
+ dict_field_t* field_r =
+ dict_index_get_nth_field(index_ref, i);
+ const dict_col_t* col_r = dict_field_get_col(field_r);
+
+ data = rec_get_nth_field(rec, offsets, i, &len);
+ if (key_len + ((len != UNIV_SQL_NULL) ? len + 1 : 1) >
+ *buf_len) {
+ fprintf (stderr,
+ "WSREP: FK key len exceeded %lu %lu %lu\n",
+ key_len, len, *buf_len);
+ goto err_out;
+ }
+
+ if (len == UNIV_SQL_NULL) {
+ ut_a(!(col_f->prtype & DATA_NOT_NULL));
+ *buf++ = 1;
+ key_len++;
+ } else if (!new_protocol) {
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ memcpy(buf, data, len);
+ *buf_len = wsrep_innobase_mysql_sort(
+ (int)(col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)dtype_get_charset_coll(col_f->prtype),
+ buf, len, *buf_len);
+ } else { /* new protocol */
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ switch (col_f->mtype) {
+ case DATA_INT: {
+ byte* ptr = buf+len;
+ for (;;) {
+ ptr--;
+ *ptr = *data;
+ if (ptr == buf) {
+ break;
+ }
+ data++;
+ }
+
+ if (!(col_f->prtype & DATA_UNSIGNED)) {
+ buf[len-1] = (byte) (buf[len-1] ^ 128);
+ }
+
+ break;
+ }
+ case DATA_VARCHAR:
+ case DATA_VARMYSQL:
+ case DATA_CHAR:
+ case DATA_MYSQL:
+ /* Copy the actual data */
+ ut_memcpy(buf, data, len);
+ len = wsrep_innobase_mysql_sort(
+ (int)
+ (col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)
+ dtype_get_charset_coll(col_f->prtype),
+ buf, len, *buf_len);
+ break;
+ case DATA_BLOB:
+ case DATA_BINARY:
+ memcpy(buf, data, len);
+ break;
+ default:
+ break;
+ }
+
+ key_len += len;
+ buf += len;
+ }
+ }
+
+ rec_validate(rec, offsets);
+
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ *buf_len = key_len;
+ return DB_SUCCESS;
+
+ err_out:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+ return DB_ERROR;
+}
+#endif // WITH_WSREP
diff --git a/storage/xtradb/row/row0ins.c b/storage/xtradb/row/row0ins.c
index 97a4fb0767f..c40f985d987 100644
--- a/storage/xtradb/row/row0ins.c
+++ b/storage/xtradb/row/row0ins.c
@@ -759,6 +759,14 @@ row_ins_invalidate_query_cache(
innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
mem_free(buf);
}
+#ifdef WITH_WSREP
+ulint wsrep_append_foreign_key(trx_t *trx,
+ dict_foreign_t* foreign,
+ const rec_t* clust_rec,
+ dict_index_t* clust_index,
+ ibool referenced,
+ ibool shared);
+#endif /* WITH_WSREP */
/*********************************************************************//**
Perform referential actions or checks when a parent row is deleted or updated
@@ -1072,6 +1080,18 @@ row_ins_foreign_check_on_constraint(
cascade->state = UPD_NODE_UPDATE_CLUSTERED;
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ clust_rec,
+ clust_index,
+ FALSE, FALSE);
+ if (err != DB_SUCCESS) {
+ fprintf(stderr,
+ "WSREP: foreign key append failed: %lu\n", err);
+ } else
+#endif
err = row_update_cascade_for_mysql(thr, cascade,
foreign->foreign_table);
@@ -1411,7 +1431,14 @@ run_again:
if (check_ref) {
err = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ rec,
+ check_index,
+ check_ref, TRUE);
+#endif /* WITH_WSREP */
goto end_scan;
} else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE
diff --git a/storage/xtradb/row/row0upd.c b/storage/xtradb/row/row0upd.c
index c0515c423db..33a7e45765e 100644
--- a/storage/xtradb/row/row0upd.c
+++ b/storage/xtradb/row/row0upd.c
@@ -51,6 +51,10 @@ Created 12/27/1996 Heikki Tuuri
#include "pars0sym.h"
#include "eval0eval.h"
#include "buf0lru.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h"
+extern my_bool wsrep_debug;
+#endif
/* What kind of latch and lock can we assume when the control comes to
@@ -170,6 +174,50 @@ func_exit:
return(is_referenced);
}
+#ifdef WITH_WSREP
+static
+ibool
+wsrep_row_upd_index_is_foreign(
+/*========================*/
+ dict_index_t* index, /*!< in: index */
+ trx_t* trx) /*!< in: transaction */
+{
+ dict_table_t* table = index->table;
+ dict_foreign_t* foreign;
+ ibool froze_data_dict = FALSE;
+ ibool is_referenced = FALSE;
+
+ if (!UT_LIST_GET_FIRST(table->foreign_list)) {
+
+ return(FALSE);
+ }
+
+ if (trx->dict_operation_lock_mode == 0) {
+ row_mysql_freeze_data_dictionary(trx);
+ froze_data_dict = TRUE;
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ if (foreign->foreign_index == index) {
+
+ is_referenced = TRUE;
+ goto func_exit;
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+func_exit:
+ if (froze_data_dict) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ return(is_referenced);
+}
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Checks if possible foreign key constraints hold after a delete of the record
under pcur.
@@ -284,7 +332,127 @@ row_upd_check_references_constraints(
}
err = DB_SUCCESS;
+func_exit:
+ if (got_s_lock) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ mem_heap_free(heap);
+
+ return(err);
+}
+#ifdef WITH_WSREP
+static
+ulint
+wsrep_row_upd_check_foreign_constraints(
+/*=================================*/
+ upd_node_t* node, /*!< in: row update node */
+ btr_pcur_t* pcur, /*!< in: cursor positioned on a record; NOTE: the
+ cursor position is lost in this function! */
+ dict_table_t* table, /*!< in: table in question */
+ dict_index_t* index, /*!< in: index of the cursor */
+ ulint* offsets,/*!< in/out: rec_get_offsets(pcur.rec, index) */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_foreign_t* foreign;
+ mem_heap_t* heap;
+ dtuple_t* entry;
+ trx_t* trx;
+ const rec_t* rec;
+ ulint n_ext;
+ ulint err;
+ ibool got_s_lock = FALSE;
+
+ if (UT_LIST_GET_FIRST(table->foreign_list) == NULL) {
+
+ return(DB_SUCCESS);
+ }
+
+ trx = thr_get_trx(thr);
+ if (wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+
+ return(DB_SUCCESS);
+ }
+
+ /* TODO: make native slave thread bail out here */
+
+ rec = btr_pcur_get_rec(pcur);
+ ut_ad(rec_offs_validate(rec, index, offsets));
+ heap = mem_heap_create(500);
+
+ entry = row_rec_to_index_entry(ROW_COPY_DATA, rec, index, offsets,
+ &n_ext, heap);
+
+ mtr_commit(mtr);
+
+ mtr_start(mtr);
+
+ if (trx->dict_operation_lock_mode == 0) {
+ got_s_lock = TRUE;
+
+ row_mysql_freeze_data_dictionary(trx);
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ /* Note that we may have an update which updates the index
+ record, but does NOT update the first fields which are
+ referenced in a foreign key constraint. Then the update does
+ NOT break the constraint. */
+
+ if (foreign->foreign_index == index
+ && (node->is_delete
+ || row_upd_changes_first_fields_binary(
+ entry, index, node->update,
+ foreign->n_fields))) {
+
+ if (foreign->referenced_table == NULL) {
+ dict_table_get(foreign->referenced_table_name_lookup,
+ FALSE, DICT_ERR_IGNORE_NONE);
+ }
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)++;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ /* NOTE that if the thread ends up waiting for a lock
+ we will release dict_operation_lock temporarily!
+ But the counter on the table protects 'foreign' from
+ being dropped while the check is running. */
+
+ err = row_ins_check_foreign_constraint(
+ TRUE, foreign, table, entry, thr);
+
+ if (foreign->referenced_table) {
+ mutex_enter(&(dict_sys->mutex));
+
+ ut_a(foreign->referenced_table
+ ->n_foreign_key_checks_running > 0);
+
+ (foreign->referenced_table
+ ->n_foreign_key_checks_running)--;
+
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if (err != DB_SUCCESS) {
+
+ goto func_exit;
+ }
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+ err = DB_SUCCESS;
func_exit:
if (got_s_lock) {
row_mysql_unfreeze_data_dictionary(trx);
@@ -294,6 +462,7 @@ func_exit:
return(err);
}
+#endif /* WITH_WSREP */
/*********************************************************************//**
Creates an update node for a query graph.
@@ -1582,10 +1751,16 @@ row_upd_sec_index_entry(
trx_t* trx = thr_get_trx(thr);
ulint mode = BTR_MODIFY_LEAF;
enum row_search_result search_result;
+#ifdef WITH_WSREP
+ ibool foreign;
+#endif /* WITH_WSREP */
index = node->index;
referenced = row_upd_index_is_referenced(index, trx);
+#ifdef WITH_WSREP
+ foreign = wsrep_row_upd_index_is_foreign(index, trx);
+#endif /* WITH_WSREP */
heap = mem_heap_create(1024);
@@ -1650,6 +1825,9 @@ row_upd_sec_index_entry(
if (!rec_get_deleted_flag(
rec, dict_table_is_comp(index->table))) {
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
err = btr_cur_del_mark_set_sec_rec(
0, btr_cur, TRUE, thr, &mtr);
@@ -1667,6 +1845,38 @@ row_upd_sec_index_entry(
node, &pcur, index->table,
index, offsets, thr, &mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) ==
+ QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ ulint* offsets =
+ rec_get_offsets(
+ rec, index, NULL,
+ ULINT_UNDEFINED, &heap);
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, &pcur, index->table,
+ index, offsets, thr, &mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug)
+ fprintf (stderr,
+ "WSREP: sec index FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %lu",
+ err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
}
break;
}
@@ -1817,6 +2027,9 @@ row_upd_clust_rec_by_insert(
que_thr_t* thr, /*!< in: query thread */
ibool referenced,/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign, /*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in/out: mtr; gets committed here */
{
mem_heap_t* heap;
@@ -1830,6 +2043,9 @@ row_upd_clust_rec_by_insert(
rec_t* rec;
ulint* offsets = NULL;
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -1915,6 +2131,34 @@ err_exit:
goto err_exit;
}
}
+#ifdef WITH_WSREP
+ if (!referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: insert FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %lu",
+ err);
+ break;
+ }
+ if (err != DB_SUCCESS) {
+ goto err_exit;
+ }
+ }
+#endif /* WITH_WSREP */
}
mtr_commit(mtr);
@@ -2090,11 +2334,18 @@ row_upd_del_mark_clust_rec(
ibool referenced,
/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign,/*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr; gets committed here */
{
btr_pcur_t* pcur;
btr_cur_t* btr_cur;
ulint err;
+#ifdef WITH_WSREP
+ rec_t* rec;
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -2111,15 +2362,49 @@ row_upd_del_mark_clust_rec(
/* Mark the clustered index record deleted; we do not have to check
locks, because we assume that we have an x-lock on the record */
+#ifdef WITH_WSREP
+ rec = btr_cur_get_rec(btr_cur);
+#endif /* WITH_WSREP */
+
err = btr_cur_del_mark_set_clust_rec(
BTR_NO_LOCKING_FLAG, btr_cur_get_block(btr_cur),
+#ifdef WITH_WSREP
+ rec, index, offsets, TRUE, thr, mtr);
+#else
btr_cur_get_rec(btr_cur), index, offsets, TRUE, thr, mtr);
+#endif /* WITH_WSREP */
if (err == DB_SUCCESS && referenced) {
/* NOTE that the following call loses the position of pcur ! */
err = row_upd_check_references_constraints(
node, pcur, index->table, index, offsets, thr, mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ thr_get_trx(thr) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, index->table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: clust rec FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: clust rec referenced FK check fail: %lu",
+ err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
mtr_commit(mtr);
@@ -2148,11 +2433,17 @@ row_upd_clust_step(
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets;
ibool referenced;
+#ifdef WITH_WSREP
+ ibool foreign;
+#endif /* WITH_WSREP */
rec_offs_init(offsets_);
index = dict_table_get_first_index(node->table);
referenced = row_upd_index_is_referenced(index, thr_get_trx(thr));
+#ifdef WITH_WSREP
+ foreign = wsrep_row_upd_index_is_foreign(index, thr_get_trx(thr));
+#endif /* WITH_WSREP */
pcur = node->pcur;
@@ -2225,7 +2516,11 @@ row_upd_clust_step(
if (node->is_delete) {
err = row_upd_del_mark_clust_rec(
+#ifdef WITH_WSREP
+ node, index, offsets, thr, referenced, foreign, mtr);
+#else
node, index, offsets, thr, referenced, mtr);
+#endif /* WITH_WSREP */
if (err == DB_SUCCESS) {
node->state = UPD_NODE_UPDATE_ALL_SEC;
@@ -2276,7 +2571,11 @@ exit_func:
externally! */
err = row_upd_clust_rec_by_insert(
+#ifdef WITH_WSREP
+ node, index, thr, referenced, foreign, mtr);
+#else
node, index, thr, referenced, mtr);
+#endif /* WITH_WSREP */
if (err != DB_SUCCESS) {
diff --git a/storage/xtradb/srv/srv0srv.c b/storage/xtradb/srv/srv0srv.c
index 9555466c9bf..8d9b3a4a7c7 100644
--- a/storage/xtradb/srv/srv0srv.c
+++ b/storage/xtradb/srv/srv0srv.c
@@ -100,6 +100,10 @@ ulong innobase_thd_get_thread_id(const void* thd);
/* prototypes for new functions added to ha_innodb.cc */
ibool innobase_get_slow_log();
+#ifdef WITH_WSREP
+extern int wsrep_debug;
+extern int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
/* The following counter is incremented whenever there is some user activity
in the server */
UNIV_INTERN ulint srv_activity_count = 0;
@@ -233,6 +237,10 @@ srv_printf_innodb_monitor() will request mutex acquisition
with mutex_enter(), which will wait until it gets the mutex. */
#define MUTEX_NOWAIT(mutex_skipped) ((mutex_skipped) < MAX_MUTEX_NOWAIT)
+#ifdef WITH_INNODB_DISALLOW_WRITES
+UNIV_INTERN os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/** The sort order table of the MySQL latin1_swedish_ci character set
collation */
UNIV_INTERN const byte* srv_latin1_ordering;
@@ -380,6 +388,9 @@ struct srv_conc_slot_struct{
free to proceed; but
reserved may still be
TRUE at that point */
+#ifdef WITH_WSREP
+ void *thd; /*!< to see priority */
+#endif
UT_LIST_NODE_T(srv_conc_slot_t) srv_conc_queue; /*!< queue node */
};
@@ -1197,8 +1208,20 @@ srv_init(void)
conc_slot->reserved = FALSE;
conc_slot->event = os_event_create(NULL);
ut_a(conc_slot->event);
+#ifdef WITH_WSREP
+ conc_slot->thd = NULL;
+#endif /* WITH_WSREP */
}
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ /* Writes have to be enabled on init or else we hang. Thus, we
+ always set the event here regardless of innobase_disallow_writes.
+ That flag will always be 0 at this point because it isn't settable
+ via my.cnf or command line arg. */
+ srv_allow_writes_event = os_event_create(NULL);
+ os_event_set(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/* Initialize some INFORMATION SCHEMA internal structures */
trx_i_s_cache_init(trx_i_s_cache);
}
@@ -1247,6 +1270,23 @@ srv_general_init(void)
/* Maximum allowable purge history length. <=0 means 'infinite'. */
UNIV_INTERN ulong srv_max_purge_lag = 0;
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx) /*!< in: transaction object associated with the
+ thread */
+{
+ os_fast_mutex_lock(&srv_conc_mutex);
+ if (trx->wsrep_event) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel\n");
+ os_event_set(trx->wsrep_event);
+ }
+ os_fast_mutex_unlock(&srv_conc_mutex);
+}
+#endif /* WITH_WSREP */
/*********************************************************************//**
Puts an OS thread to wait if there are too many concurrent threads
(>= srv_thread_concurrency) inside InnoDB. The threads wait in a FIFO queue. */
@@ -1368,6 +1408,18 @@ srv_conc_enter_innodb(
}
#endif
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+#endif
os_fast_mutex_lock(&srv_conc_mutex);
retry:
if (trx->declared_to_be_inside_innodb) {
@@ -1461,6 +1513,9 @@ retry:
/* Add to the queue */
slot->reserved = TRUE;
slot->wait_ended = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = trx->mysql_thd;
+#endif
UT_LIST_ADD_LAST(srv_conc_queue, srv_conc_queue, slot);
@@ -1468,6 +1523,19 @@ retry:
srv_conc_n_waiting_threads++;
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ srv_conc_n_waiting_threads--;
+ os_fast_mutex_unlock(&srv_conc_mutex);
+ if (wsrep_debug)
+ fprintf(stderr, "srv_conc_enter due to MUST_ABORT");
+ trx->declared_to_be_inside_innodb = TRUE;
+ trx->n_tickets_to_enter_innodb = SRV_FREE_TICKETS_TO_ENTER;
+ return;
+ }
+ trx->wsrep_event = slot->event;
+#endif /* WITH_WSREP */
os_fast_mutex_unlock(&srv_conc_mutex);
/* Go to wait for the event; when a thread leaves InnoDB it will
@@ -1491,6 +1559,9 @@ retry:
thd_wait_begin(trx->mysql_thd, THD_WAIT_USER_LOCK);
os_event_wait(slot->event);
thd_wait_end(trx->mysql_thd);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
trx->op_info = "";
@@ -1508,6 +1579,9 @@ retry:
incremented the thread counter on behalf of this thread */
slot->reserved = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = NULL;
+#endif
UT_LIST_REMOVE(srv_conc_queue, srv_conc_queue, slot);
@@ -1595,6 +1669,9 @@ srv_conc_force_exit_innodb(
trx->n_tickets_to_enter_innodb = 0;
if (srv_conc_n_threads < (lint)srv_thread_concurrency) {
+#ifdef WITH_WSREP
+ srv_conc_slot_t* wsrep_slot;
+#endif
/* Look for a slot where a thread is waiting and no other
thread has yet released the thread */
@@ -1604,6 +1681,19 @@ srv_conc_force_exit_innodb(
slot = UT_LIST_GET_NEXT(srv_conc_queue, slot);
}
+#ifdef WITH_WSREP
+ /* look for aborting trx, they must be released asap */
+ wsrep_slot= slot;
+ while (wsrep_slot && (wsrep_slot->wait_ended == TRUE ||
+ !wsrep_trx_is_aborting(wsrep_slot->thd))) {
+ wsrep_slot = UT_LIST_GET_NEXT(srv_conc_queue, wsrep_slot);
+ }
+ if (wsrep_slot) {
+ slot = wsrep_slot;
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: releasing aborting thd\n");
+ }
+#endif
if (slot != NULL) {
slot->wait_ended = TRUE;
@@ -1987,7 +2077,20 @@ srv_suspend_mysql_thread(
if (lock_wait_timeout < 100000000
&& wait_time > (double) lock_wait_timeout) {
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_BF(trx->mysql_thd, TRUE)) {
+ fprintf(stderr,
+ "WSREP: BF long lock wait ended after %.f sec\n",
+ wait_time);
+ srv_print_innodb_monitor = FALSE;
+ srv_print_innodb_lock_monitor = FALSE;
+ } else {
+#endif
trx->error_state = DB_LOCK_WAIT_TIMEOUT;
+#ifdef WITH_WSREP
+ }
+#endif
}
if (trx_is_interrupted(trx)) {
@@ -2786,6 +2889,27 @@ exit_func:
OS_THREAD_DUMMY_RETURN;
}
+#ifdef WITH_WSREP
+/*********************************************************************//**
+check if lock timeout was for priority thread,
+as a side effect trigger lock monitor
+@return false for regular lock timeout */
+static ibool
+wsrep_is_BF_lock_timeout(
+/*====================*/
+ srv_slot_t* slot) /* in: lock slot to check for lock priority */
+{
+ if (wsrep_on(thr_get_trx(slot->thr)->mysql_thd) &&
+ wsrep_thd_is_BF((thr_get_trx(slot->thr))->mysql_thd, TRUE)) {
+ fprintf(stderr, "WSREP: BF lock wait long\n");
+ srv_print_innodb_monitor = TRUE;
+ srv_print_innodb_lock_monitor = TRUE;
+ os_event_set(srv_lock_timeout_thread_event);
+ return TRUE;
+ }
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
/*********************************************************************//**
A thread which wakes up threads whose lock wait may have lasted too long.
@return a dummy parameter */
@@ -2854,8 +2978,14 @@ loop:
granted: in that case do nothing */
if (trx->wait_lock) {
+#ifdef WITH_WSREP
+ if (!wsrep_is_BF_lock_timeout(slot)) {
+#endif
lock_cancel_waiting_and_release(
trx->wait_lock);
+#ifdef WITH_WSREP
+ }
+#endif
}
}
}
@@ -2974,7 +3104,20 @@ loop:
if (sync_array_print_long_waits(&waiter, &sema)
&& sema == old_sema && os_thread_eq(waiter, old_waiter)) {
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ if (srv_allow_writes_event->is_set) {
+#endif /* WITH_WSREP */
fatal_cnt++;
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ } else {
+ fprintf(stderr,
+ "WSREP: avoiding InnoDB self crash due to long "
+ "semaphore wait of > %lu seconds\n"
+ "Server is processing SST donor operation, "
+ "fatal_cnt now: %lu",
+ (ulong) srv_fatal_semaphore_wait_threshold, fatal_cnt);
+ }
+#endif /* WITH_WSREP */
if (fatal_cnt > 10) {
fprintf(stderr,
diff --git a/storage/xtradb/srv/srv0start.c b/storage/xtradb/srv/srv0start.c
index 7b336c81b10..6045130b680 100644
--- a/storage/xtradb/srv/srv0start.c
+++ b/storage/xtradb/srv/srv0start.c
@@ -91,7 +91,10 @@ Created 2/16/1996 Heikki Tuuri
# include "buf0lru.h" /* for buf_LRU_file_restore() */
# include "os0stacktrace.h"
-/** Log sequence number immediately after startup */
+#ifdef WITH_WSREP
+extern my_bool wsrep_recovery;
+#endif /* WITH_WSREP */
+ /** Log sequence number immediately after startup */
UNIV_INTERN ib_uint64_t srv_start_lsn;
/** Log sequence number at shutdown */
UNIV_INTERN ib_uint64_t srv_shutdown_lsn;
@@ -2107,6 +2110,10 @@ innobase_start_or_create_for_mysql(void)
os_thread_create(&srv_monitor_thread, NULL,
thread_ids + 4 + SRV_MAX_N_IO_THREADS);
+#ifdef WITH_WSREP
+ /* Don't start the LRU thread when recovery is on */
+ if (!wsrep_recovery) {
+#endif /* WITH_WSREP */
/* Create the thread which automaticaly dumps/restore buffer pool */
os_thread_create(&srv_LRU_dump_restore_thread, NULL,
thread_ids + 5 + SRV_MAX_N_IO_THREADS);
@@ -2115,6 +2122,9 @@ innobase_start_or_create_for_mysql(void)
synchronously */
if (srv_auto_lru_dump && srv_blocking_lru_restore)
buf_LRU_file_restore();
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
srv_is_being_started = FALSE;
diff --git a/storage/xtradb/trx/trx0roll.c b/storage/xtradb/trx/trx0roll.c
index 25c1d5d4692..2dde8900cda 100644
--- a/storage/xtradb/trx/trx0roll.c
+++ b/storage/xtradb/trx/trx0roll.c
@@ -42,6 +42,9 @@ Created 3/26/1996 Heikki Tuuri
#include "row0mysql.h"
#include "lock0lock.h"
#include "pars0pars.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h"
+#endif /* WITH_WSREP */
/** This many pages must be undone before a truncate is tried within
rollback */
@@ -147,6 +150,12 @@ trx_rollback_for_mysql(
trx->op_info = "";
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
return(err);
}
@@ -174,6 +183,12 @@ trx_rollback_last_sql_stat_for_mysql(
trx->op_info = "";
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
return(err);
}
@@ -1123,6 +1138,12 @@ trx_rollback(
srv_que_task_enqueue_low(thr);
/* srv_que_task_enqueue_low(thr2); */
}
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
}
/****************************************************************//**
@@ -1281,6 +1302,12 @@ trx_finish_rollback_off_kernel(
sig = next_sig;
}
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
}
/*********************************************************************//**
diff --git a/storage/xtradb/trx/trx0sys.c b/storage/xtradb/trx/trx0sys.c
index a56e55c0e19..7d9397b8e2c 100644
--- a/storage/xtradb/trx/trx0sys.c
+++ b/storage/xtradb/trx/trx0sys.c
@@ -44,6 +44,10 @@ Created 3/26/1996 Heikki Tuuri
#include "os0file.h"
#include "read0read.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h" /* wsrep_is_wsrep_xid() */
+#endif /* */
+
/** The file format tag structure with id and name. */
struct file_format_struct {
ulint id; /*!< id of the file format */
@@ -960,6 +964,125 @@ trx_sys_print_mysql_binlog_offset(void)
mtr_commit(&mtr);
}
+#ifdef WITH_WSREP
+
+#ifdef UNIV_DEBUG
+static long long trx_sys_cur_xid_seqno = -1;
+static unsigned char trx_sys_cur_xid_uuid[16];
+
+long long read_wsrep_xid_seqno(const XID* xid)
+{
+ long long seqno;
+ memcpy(&seqno, xid->data + 24, sizeof(long long));
+ return seqno;
+}
+
+void read_wsrep_xid_uuid(const XID* xid, unsigned char* buf)
+{
+ memcpy(buf, xid->data + 8, 16);
+}
+
+#endif /* UNIV_DEBUG */
+
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: transaction XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr) /*!< in: mtr */
+{
+
+#ifdef UNIV_DEBUG
+ {
+ /* Check that seqno is monotonically increasing */
+ unsigned char xid_uuid[16];
+ long long xid_seqno = read_wsrep_xid_seqno(xid);
+ read_wsrep_xid_uuid(xid, xid_uuid);
+ if (!memcmp(xid_uuid, trx_sys_cur_xid_uuid, 8))
+ {
+ ut_ad(xid_seqno > trx_sys_cur_xid_seqno);
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+ else
+ {
+ memcpy(trx_sys_cur_xid_uuid, xid_uuid, 16);
+ }
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+#endif /* UNIV_DEBUG */
+
+ ut_ad(xid && mtr && sys_header);
+ ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid));
+
+ if (mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD)
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD,
+ TRX_SYS_WSREP_XID_MAGIC_N,
+ MLOG_4BYTES, mtr);
+ }
+
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_FORMAT,
+ (int)xid->formatID,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_GTRID_LEN,
+ (int)xid->gtrid_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_BQUAL_LEN,
+ (int)xid->bqual_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_string(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_DATA,
+ (const unsigned char*) xid->data,
+ XIDDATASIZE, mtr);
+
+}
+
+void
+trx_sys_read_wsrep_checkpoint(XID* xid)
+/*===================================*/
+{
+ trx_sysf_t* sys_header;
+ mtr_t mtr;
+ ulint magic;
+
+ ut_ad(xid);
+
+ mtr_start(&mtr);
+
+ sys_header = trx_sysf_get(&mtr);
+
+ if ((magic = mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD))
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ memset(xid, 0, sizeof(*xid));
+ xid->formatID = -1;
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ return;
+ }
+
+ xid->formatID = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_FORMAT);
+ xid->gtrid_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_GTRID_LEN);
+ xid->bqual_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_BQUAL_LEN);
+ ut_memcpy(xid->data,
+ sys_header + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_DATA,
+ XIDDATASIZE);
+
+ mtr_commit(&mtr);
+}
+
+#endif /* WITH_WSREP */
+
/*****************************************************************//**
Reads the log coordinates at the given offset in the trx sys header. */
static
diff --git a/storage/xtradb/trx/trx0trx.c b/storage/xtradb/trx/trx0trx.c
index bc511504db9..27792e57e9c 100644
--- a/storage/xtradb/trx/trx0trx.c
+++ b/storage/xtradb/trx/trx0trx.c
@@ -333,6 +333,9 @@ trx_create(
/* Remember to free the vector explicitly. */
trx->autoinc_locks = ib_vector_create(
mem_heap_create(sizeof(ib_vector_t) + sizeof(void*) * 4), 4);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
return(trx);
}
@@ -875,6 +878,11 @@ trx_start_low(
trx->id = trx_sys_get_new_trx_id();
+#ifdef WITH_WSREP
+ memset(&trx->xid, 0, sizeof(trx->xid));
+ trx->xid.formatID = -1;
+#endif /* WITH_WSREP */
+
/* The initial value for trx->no: IB_ULONGLONG_MAX is used in
read_view_open_now: */
@@ -1040,6 +1048,15 @@ trx_write_serialisation_history(
mutex_exit(&rseg->mutex);
+#ifdef WITH_WSREP
+ sys_header = trx_sysf_get(&mtr);
+ /* Update latest MySQL wsrep XID in trx sys header. */
+ if (wsrep_is_wsrep_xid(&trx->xid))
+ {
+ trx_sys_update_wsrep_checkpoint(&trx->xid, sys_header, &mtr);
+ }
+#endif /* WITH_WSREP */
+
/* Update the latest MySQL binlog name and offset info
in trx sys header if MySQL binlogging is on or the database
server is a MySQL replication slave */
@@ -1054,7 +1071,8 @@ trx_write_serialisation_history(
sys_header,
trx->mysql_log_file_name,
trx->mysql_log_offset,
- TRX_SYS_MYSQL_LOG_INFO, &mtr);
+ TRX_SYS_MYSQL_LOG_INFO,
+ &mtr);
trx->mysql_log_file_name = NULL;
}
@@ -1282,6 +1300,12 @@ trx_commit_off_kernel(
ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
ut_ad(UT_LIST_GET_LEN(trx->trx_locks) == 0);
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->was_chosen_as_deadlock_victim) {
+ trx->was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
UT_LIST_REMOVE(trx_list, trx_sys->trx_list, trx);
ut_ad(trx_sys->descr_n_used <= UT_LIST_GET_LEN(trx_sys->trx_list));