summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMonty <monty@mariadb.org>2021-01-17 16:06:43 +0200
committerSergei Golubchik <serg@mariadb.org>2021-05-19 22:54:13 +0200
commit6aa9a552c213246d4db6cfce78c75fd4b7f32df5 (patch)
treeebc124b95681bdbf9631a8f014a41aa5c3b54e63 /sql
parent7a588c30b19d9ee21be2755d70f7ed7fd3676c4b (diff)
downloadmariadb-git-6aa9a552c213246d4db6cfce78c75fd4b7f32df5.tar.gz
MDEV-24576 Atomic CREATE TABLE
There are a few different cases to consider Logging of CREATE TABLE and CREATE TABLE ... LIKE - If REPLACE is used and there was an existing table, DDL log the drop of the table. - If discovery of table is to be done - DDL LOG create table else - DDL log create table (with engine type) - create the table - If table was created - Log entry to binary log with xid - Mark DDL log completed Crash recovery: - If query was in binary log do nothing and exit - If discoverted table - Delete the .frm file -else - Drop created table and frm file - If table was dropped, write a DROP TABLE statement in binary log CREATE TABLE ... SELECT required a little more work as when one is using statement logging the query is written to the binary log before commit is done. This was fixed by adding a DROP TABLE to the binary log during crash recovery if the ddl log entry was not closed. In this case the binary log will contain: CREATE TABLE xxx ... SELECT .... DROP TABLE xxx; Other things: - Added debug_crash_here() functionality to Aria to be able to test crash in create table between the creation of the .MAI and the .MAD files.
Diffstat (limited to 'sql')
-rw-r--r--sql/ddl_log.cc88
-rw-r--r--sql/ddl_log.h14
-rw-r--r--sql/handler.cc1
-rw-r--r--sql/sql_alter.h6
-rw-r--r--sql/sql_class.h9
-rw-r--r--sql/sql_insert.cc56
-rw-r--r--sql/sql_table.cc143
-rw-r--r--sql/sql_table.h5
8 files changed, 282 insertions, 40 deletions
diff --git a/sql/ddl_log.cc b/sql/ddl_log.cc
index f6f4a3f43f2..db2c386f800 100644
--- a/sql/ddl_log.cc
+++ b/sql/ddl_log.cc
@@ -89,7 +89,7 @@ const char *ddl_log_action_name[DDL_LOG_LAST_ACTION]=
"partitioning replace", "partitioning exchange",
"rename table", "rename view",
"initialize drop table", "drop table",
- "drop view", "drop trigger", "drop db",
+ "drop view", "drop trigger", "drop db", "create table",
};
/* Number of phases per entry */
@@ -98,7 +98,7 @@ const uchar ddl_log_entry_phases[DDL_LOG_LAST_ACTION]=
0, 1, 1, 2,
(uchar) EXCH_PHASE_END, (uchar) DDL_RENAME_PHASE_END, 1, 1,
(uchar) DDL_DROP_PHASE_END, 1, 1,
- (uchar) DDL_DROP_DB_PHASE_END
+ (uchar) DDL_DROP_DB_PHASE_END, (uchar) DDL_CREATE_TABLE_PHASE_END
};
@@ -1499,6 +1499,59 @@ static int ddl_log_execute_action(THD *thd, MEM_ROOT *mem_root,
}
break;
}
+ case DDL_LOG_CREATE_TABLE_ACTION:
+ {
+ LEX_CSTRING db, table, path;
+ db= ddl_log_entry->db;
+ table= ddl_log_entry->name;
+ path= ddl_log_entry->tmp_name;
+
+ /* Don't delete the table if we didn't create it */
+ if (ddl_log_entry->flags == 0)
+ {
+ if (hton)
+ {
+ if ((error= hton->drop_table(hton, path.str)))
+ {
+ if (!non_existing_table_error(error))
+ break;
+ error= -1;
+ }
+ }
+ else
+ error= ha_delete_table_force(thd, path.str, &db, &table);
+ }
+ strxnmov(to_path, sizeof(to_path)-1, path.str, reg_ext, NullS);
+ mysql_file_delete(key_file_frm, to_path, MYF(MY_WME|MY_IGNORE_ENOENT));
+ if (ddl_log_entry->phase == DDL_CREATE_TABLE_PHASE_LOG)
+ {
+ /*
+ The server logged CREATE TABLE ... SELECT into binary log
+ before crashing. As the commit failed and we have delete the
+ table above, we have now to log the DROP of the created table.
+ */
+
+ String *query= &recovery_state.drop_table;
+ query->length(0);
+ query->append(STRING_WITH_LEN("DROP TABLE IF EXISTS "));
+ append_identifier(thd, query, &db);
+ query->append('.');
+ append_identifier(thd, query, &table);
+ query->append(&end_comment);
+
+ if (mysql_bin_log.is_open())
+ {
+ mysql_mutex_unlock(&LOCK_gdl);
+ (void) thd->binlog_query(THD::STMT_QUERY_TYPE,
+ query->ptr(), query->length(),
+ TRUE, FALSE, FALSE, 0);
+ mysql_mutex_lock(&LOCK_gdl);
+ }
+ }
+ (void) update_phase(entry_pos, DDL_LOG_FINAL_PHASE);
+ break;
+ }
+ break;
default:
DBUG_ASSERT(0);
break;
@@ -2417,3 +2470,34 @@ bool ddl_log_drop_db(THD *thd, DDL_LOG_STATE *ddl_state,
ddl_log_entry.tmp_name= *const_cast<LEX_CSTRING*>(path);
DBUG_RETURN(ddl_log_write(ddl_state, &ddl_log_entry));
}
+
+
+/**
+ Log CREATE TABLE
+
+ @param only_frm On recovery, only drop the .frm. This is needed for
+ example when deleting a table that was discovered.
+*/
+
+bool ddl_log_create_table(THD *thd, DDL_LOG_STATE *ddl_state,
+ handlerton *hton,
+ const LEX_CSTRING *path,
+ const LEX_CSTRING *db,
+ const LEX_CSTRING *table,
+ bool only_frm)
+{
+ DDL_LOG_ENTRY ddl_log_entry;
+ DBUG_ENTER("ddl_log_create_table");
+
+ bzero(&ddl_log_entry, sizeof(ddl_log_entry));
+ ddl_log_entry.action_type= DDL_LOG_CREATE_TABLE_ACTION;
+ if (hton)
+ lex_string_set(&ddl_log_entry.handler_name,
+ ha_resolve_storage_engine_name(hton));
+ ddl_log_entry.db= *const_cast<LEX_CSTRING*>(db);
+ ddl_log_entry.name= *const_cast<LEX_CSTRING*>(table);
+ ddl_log_entry.tmp_name= *const_cast<LEX_CSTRING*>(path);
+ ddl_log_entry.flags= only_frm;
+
+ DBUG_RETURN(ddl_log_write(ddl_state, &ddl_log_entry));
+}
diff --git a/sql/ddl_log.h b/sql/ddl_log.h
index 5aa039f46a5..368b887b6c5 100644
--- a/sql/ddl_log.h
+++ b/sql/ddl_log.h
@@ -82,6 +82,7 @@ enum ddl_log_action_code
DDL_LOG_DROP_VIEW_ACTION= 9,
DDL_LOG_DROP_TRIGGER_ACTION= 10,
DDL_LOG_DROP_DB_ACTION=11,
+ DDL_LOG_CREATE_TABLE_ACTION=12,
DDL_LOG_LAST_ACTION /* End marker */
};
@@ -118,6 +119,12 @@ enum enum_ddl_log_drop_db_phase {
DDL_DROP_DB_PHASE_END
};
+enum enum_ddl_log_create_table_phase {
+ DDL_CREATE_TABLE_PHASE_INIT=0,
+ DDL_CREATE_TABLE_PHASE_LOG,
+ DDL_CREATE_TABLE_PHASE_END
+};
+
/*
Setting ddl_log_entry.phase to this has the same effect as setting
the phase to the maximum phase (..PHASE_END) for an entry.
@@ -185,6 +192,7 @@ typedef struct st_ddl_log_state
DDL_LOG_MEMORY_ENTRY *list;
/* One execute entry per list */
DDL_LOG_MEMORY_ENTRY *execute_entry;
+ bool is_active() { return list != 0; }
} DDL_LOG_STATE;
@@ -252,5 +260,11 @@ bool ddl_log_drop_view(THD *thd, DDL_LOG_STATE *ddl_state,
const LEX_CSTRING *db);
bool ddl_log_drop_db(THD *thd, DDL_LOG_STATE *ddl_state,
const LEX_CSTRING *db, const LEX_CSTRING *path);
+bool ddl_log_create_table(THD *thd, DDL_LOG_STATE *ddl_state,
+ handlerton *hton,
+ const LEX_CSTRING *path,
+ const LEX_CSTRING *db,
+ const LEX_CSTRING *table,
+ bool only_frm);
extern mysql_mutex_t LOCK_gdl;
#endif /* DDL_LOG_INCLUDED */
diff --git a/sql/handler.cc b/sql/handler.cc
index 0f2533e6f11..82c0f11905a 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -4563,6 +4563,7 @@ bool non_existing_table_error(int error)
{
return (error == ENOENT ||
(error == EE_DELETE && my_errno == ENOENT) ||
+ error == EE_FILENOTFOUND ||
error == HA_ERR_NO_SUCH_TABLE ||
error == HA_ERR_UNSUPPORTED ||
error == ER_NO_SUCH_TABLE ||
diff --git a/sql/sql_alter.h b/sql/sql_alter.h
index 1c98ac1651d..b922cbf0637 100644
--- a/sql/sql_alter.h
+++ b/sql/sql_alter.h
@@ -294,6 +294,12 @@ public:
const char *get_tmp_path() const
{ return tmp_path; }
+ const LEX_CSTRING get_tmp_cstring_path() const
+ {
+ LEX_CSTRING tmp= { tmp_path, strlen(tmp_path) };
+ return tmp;
+ };
+
/**
Mark ALTER TABLE as needing to produce foreign key error if
it deletes a row from the table being changed.
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 77861611794..2df0651886f 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -39,7 +39,7 @@
#include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA, THR_LOCK_INFO */
#include "thr_timer.h"
#include "thr_malloc.h"
-#include "log_slow.h" /* LOG_SLOW_DISABLE_... */
+#include "log_slow.h" /* LOG_SLOW_DISABLE_... */
#include <my_tree.h>
#include "sql_digest_stream.h" // sql_digest_state
#include <mysql/psi/mysql_stage.h>
@@ -50,6 +50,7 @@
#include "session_tracker.h"
#include "backup.h"
#include "xa.h"
+#include "ddl_log.h" /* DDL_LOG_STATE */
extern "C"
void set_thd_stage_info(void *thd,
@@ -6027,6 +6028,7 @@ class select_create: public select_insert {
MYSQL_LOCK **m_plock;
bool exit_done;
TMP_TABLE_SHARE *saved_tmp_table_share;
+ DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm;
public:
select_create(THD *thd_arg, TABLE_LIST *table_arg,
@@ -6042,7 +6044,10 @@ public:
alter_info(alter_info_arg),
m_plock(NULL), exit_done(0),
saved_tmp_table_share(0)
- {}
+ {
+ bzero(&ddl_log_state_create, sizeof(ddl_log_state_create));
+ bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm));
+ }
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
void store_values(List<Item> &values);
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 52d76950dd0..8dde0ec8989 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -4375,7 +4375,8 @@ Field *Item::create_field_for_create_select(MEM_ROOT *root, TABLE *table)
*/
TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
- MYSQL_LOCK **lock, TABLEOP_HOOKS *hooks)
+ MYSQL_LOCK **lock,
+ TABLEOP_HOOKS *hooks)
{
TABLE tmp_table; // Used during 'Create_field()'
TABLE_SHARE share;
@@ -4473,7 +4474,8 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
open_table().
*/
- if (!mysql_create_table_no_lock(thd, &create_table->db,
+ if (!mysql_create_table_no_lock(thd, &ddl_log_state_create, &ddl_log_state_rm,
+ &create_table->db,
&create_table->table_name,
create_info, alter_info, NULL,
select_field_count, create_table))
@@ -4532,6 +4534,8 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
{
if (likely(!thd->is_error())) // CREATE ... IF NOT EXISTS
my_ok(thd); // succeed, but did nothing
+ ddl_log_complete(&ddl_log_state_rm);
+ ddl_log_complete(&ddl_log_state_create);
DBUG_RETURN(NULL);
}
@@ -4570,7 +4574,9 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
*lock= 0;
}
drop_open_table(thd, table, &create_table->db, &create_table->table_name);
- DBUG_RETURN(0);
+ ddl_log_complete(&ddl_log_state_rm);
+ ddl_log_complete(&ddl_log_state_create);
+ DBUG_RETURN(NULL);
/* purecov: end */
}
table->s->table_creation_was_logged= save_table_creation_was_logged;
@@ -4912,11 +4918,30 @@ bool select_create::send_eof()
if (thd->slave_thread)
thd->variables.binlog_annotate_row_events= 0;
+ debug_crash_here("ddl_log_create_before_binlog");
+
+ /*
+ In case of crash, we have to add DROP TABLE to the binary log as
+ the CREATE TABLE will already be logged if we are not using row based
+ replication.
+ */
+ if (!thd->is_current_stmt_binlog_format_row())
+ {
+ if (ddl_log_state_create.is_active()) // Not temporary table
+ ddl_log_update_phase(&ddl_log_state_create, DDL_CREATE_TABLE_PHASE_LOG);
+ /*
+ We can ignore if we replaced an old table as ddl_log_state_create will
+ now handle the logging of the drop if needed.
+ */
+ ddl_log_complete(&ddl_log_state_rm);
+ }
+
if (prepare_eof())
{
abort_result_set();
DBUG_RETURN(true);
}
+ debug_crash_here("ddl_log_create_after_prepare_eof");
if (table->s->tmp_table)
{
@@ -4983,9 +5008,15 @@ bool select_create::send_eof()
thd->get_stmt_da()->set_overwrite_status(true);
}
#endif /* WITH_WSREP */
+ thd->binlog_xid= thd->query_id;
+ /* Remember xid's for the case of row based logging */
+ ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
+ ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
trans_commit_stmt(thd);
if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
trans_commit_implicit(thd);
+ thd->binlog_xid= 0;
+
#ifdef WITH_WSREP
if (WSREP(thd))
{
@@ -5004,6 +5035,15 @@ bool select_create::send_eof()
}
#endif /* WITH_WSREP */
}
+ /*
+ If are using statement based replication the table will be deleted here
+ in case of a crash as we can't use xid to check if the query was logged
+ (as the query was logged before commit!)
+ */
+ debug_crash_here("ddl_log_create_after_binlog");
+ ddl_log_complete(&ddl_log_state_rm);
+ ddl_log_complete(&ddl_log_state_create);
+ debug_crash_here("ddl_log_create_log_complete");
/*
exit_done must only be set after last potential call to
@@ -5120,9 +5160,19 @@ void select_create::abort_result_set()
binlog_reset_cache(thd);
/* Original table was deleted. We have to log it */
if (table_creation_was_logged)
+ {
+ thd->binlog_xid= thd->query_id;
+ ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
+ ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
+ debug_crash_here("ddl_log_create_before_binlog");
log_drop_table(thd, &create_table->db, &create_table->table_name,
tmp_table);
+ debug_crash_here("ddl_log_create_after_binlog");
+ thd->binlog_xid= 0;
+ }
}
}
+ ddl_log_complete(&ddl_log_state_rm);
+ ddl_log_complete(&ddl_log_state_create);
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 12d387bc817..f00aaaee00a 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -1078,6 +1078,7 @@ static uint32 get_comment(THD *thd, uint32 comment_pos,
const uchar *query_end= (uchar*) query + thd->query_length();
const uchar *const state_map= thd->charset()->state_map;
+ *comment_start= ""; // Ensure it points to something
for (; query < query_end; query++)
{
if (state_map[static_cast<uchar>(*query)] == MY_LEX_SKIP)
@@ -4131,10 +4132,13 @@ err:
*/
static
-int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
+int create_table_impl(THD *thd,
+ DDL_LOG_STATE *ddl_log_state_create,
+ DDL_LOG_STATE *ddl_log_state_rm,
+ const LEX_CSTRING &orig_db,
const LEX_CSTRING &orig_table_name,
const LEX_CSTRING &db, const LEX_CSTRING &table_name,
- const char *path, const DDL_options_st options,
+ const LEX_CSTRING &path, const DDL_options_st options,
HA_CREATE_INFO *create_info, Alter_info *alter_info,
int create_table_mode, bool *is_trans, KEY **key_info,
uint *key_count, LEX_CUSTRING *frm)
@@ -4145,9 +4149,16 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
bool frm_only= create_table_mode == C_ALTER_TABLE_FRM_ONLY;
bool internal_tmp_table= create_table_mode == C_ALTER_TABLE || frm_only;
handlerton *exists_hton;
- DBUG_ENTER("mysql_create_table_no_lock");
+ DBUG_ENTER("create_table_impl");
DBUG_PRINT("enter", ("db: '%s' table: '%s' tmp: %d path: %s",
- db.str, table_name.str, internal_tmp_table, path));
+ db.str, table_name.str, internal_tmp_table, path.str));
+
+ /* Easy check for ddl logging if we are creating a temporary table */
+ if (create_info->tmp_table())
+ {
+ ddl_log_state_create= 0;
+ ddl_log_state_rm= 0;
+ }
if (fix_constraints_names(thd, &alter_info->check_constraint_list,
create_info))
@@ -4261,10 +4272,12 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
(void) trans_rollback_stmt(thd);
/* Remove normal table without logging. Keep tables locked */
if (mysql_rm_table_no_locks(thd, &table_list, &thd->db,
- (DDL_LOG_STATE*) 0,
+ ddl_log_state_rm,
0, 0, 0, 0, 1, 1))
goto err;
+ debug_crash_here("ddl_log_create_after_drop");
+
/*
We have to log this query, even if it failed later to ensure the
drop is done.
@@ -4327,7 +4340,7 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
goto err;
}
- init_tmp_table_share(thd, &share, db.str, 0, table_name.str, path);
+ init_tmp_table_share(thd, &share, db.str, 0, table_name.str, path.str);
/* prepare everything for discovery */
share.field= &no_fields;
@@ -4338,6 +4351,14 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
if (parse_engine_table_options(thd, hton, &share))
goto err;
+ /*
+ Log that we are going to do discovery. If things fails, any generated
+ .frm files are deleted
+ */
+ if (ddl_log_state_create)
+ ddl_log_create_table(thd, ddl_log_state_create, (handlerton*) 0, &path,
+ &db, &table_name, 1);
+
ha_err= hton->discover_table_structure(hton, thd, &share, create_info);
/*
@@ -4359,45 +4380,56 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
}
else
{
+ if (ddl_log_state_create)
+ ddl_log_create_table(thd, ddl_log_state_create, create_info->db_type,
+ &path, &db, &table_name, frm_only);
+ debug_crash_here("ddl_log_create_before_create_frm");
+
file= mysql_create_frm_image(thd, orig_db, orig_table_name, create_info,
alter_info, create_table_mode, key_info,
key_count, frm);
/*
- TODO: remove this check of thd->is_error() (now it intercept
- errors in some val_*() methoids and bring some single place to
- such error interception).
+ TODO: remove this check of thd->is_error() (now it intercept
+ errors in some val_*() methods and bring some single place to
+ such error interception).
*/
if (!file || thd->is_error())
+ {
+ if (!file)
+ deletefrm(path.str);
goto err;
+ }
if (thd->variables.keep_files_on_create)
create_info->options|= HA_CREATE_KEEP_FILES;
- if (file->ha_create_partitioning_metadata(path, NULL, CHF_CREATE_FLAG))
+ if (file->ha_create_partitioning_metadata(path.str, NULL, CHF_CREATE_FLAG))
goto err;
if (!frm_only)
{
- if (ha_create_table(thd, path, db.str, table_name.str, create_info,
+ debug_crash_here("ddl_log_create_before_create_table");
+ if (ha_create_table(thd, path.str, db.str, table_name.str, create_info,
frm, 0))
{
- file->ha_create_partitioning_metadata(path, NULL, CHF_DELETE_FLAG);
- deletefrm(path);
+ file->ha_create_partitioning_metadata(path.str, NULL, CHF_DELETE_FLAG);
+ deletefrm(path.str);
goto err;
}
+ debug_crash_here("ddl_log_create_after_create_table");
}
}
create_info->table= 0;
if (!frm_only && create_info->tmp_table())
{
- TABLE *table= thd->create_and_open_tmp_table(frm, path, db.str,
+ TABLE *table= thd->create_and_open_tmp_table(frm, path.str, db.str,
table_name.str,
false);
if (!table)
{
- (void) thd->rm_temporary_table(create_info->db_type, path);
+ (void) thd->rm_temporary_table(create_info->db_type, path.str);
goto err;
}
@@ -4410,6 +4442,12 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
error= 0;
err:
+ if (unlikely(error) && ddl_log_state_create)
+ {
+ /* Table was never created, so we can ignore the ddl log entry */
+ ddl_log_complete(ddl_log_state_create);
+ }
+
THD_STAGE_INFO(thd, stage_after_create);
delete file;
DBUG_PRINT("exit", ("return: %d", error));
@@ -4435,7 +4473,10 @@ warn:
-1 Table was used with IF NOT EXISTS and table existed (warning, not error)
*/
-int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db,
+int mysql_create_table_no_lock(THD *thd,
+ DDL_LOG_STATE *ddl_log_state_create,
+ DDL_LOG_STATE *ddl_log_state_rm,
+ const LEX_CSTRING *db,
const LEX_CSTRING *table_name,
Table_specification_st *create_info,
Alter_info *alter_info, bool *is_trans,
@@ -4444,26 +4485,31 @@ int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db,
KEY *not_used_1;
uint not_used_2;
int res;
+ uint path_length;
char path[FN_REFLEN + 1];
+ LEX_CSTRING cpath;
LEX_CUSTRING frm= {0,0};
if (create_info->tmp_table())
- build_tmptable_filename(thd, path, sizeof(path));
+ path_length= build_tmptable_filename(thd, path, sizeof(path));
else
{
- int length;
const LEX_CSTRING *alias= table_case_name(create_info, table_name);
- length= build_table_filename(path, sizeof(path) - 1, db->str, alias->str, "", 0);
+ path_length= build_table_filename(path, sizeof(path) - 1, db->str,
+ alias->str,
+ "", 0);
// Check if we hit FN_REFLEN bytes along with file extension.
- if (length+reg_ext_length > FN_REFLEN)
+ if (path_length+reg_ext_length > FN_REFLEN)
{
my_error(ER_IDENT_CAUSES_TOO_LONG_PATH, MYF(0), (int) sizeof(path)-1,
path);
return true;
}
}
+ lex_string_set3(&cpath, path, path_length);
- res= create_table_impl(thd, *db, *table_name, *db, *table_name, path,
+ res= create_table_impl(thd, ddl_log_state_create, ddl_log_state_rm,
+ *db, *table_name, *db, *table_name, cpath,
*create_info, create_info,
alter_info, create_table_mode,
is_trans, &not_used_1, &not_used_2, &frm);
@@ -4516,17 +4562,22 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
Table_specification_st *create_info,
Alter_info *alter_info)
{
- bool is_trans= FALSE;
- bool result;
- int create_table_mode;
TABLE_LIST *pos_in_locked_tables= 0;
MDL_ticket *mdl_ticket= 0;
+ DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm;
+ int create_table_mode;
+ uint save_thd_create_info_options;
+ bool is_trans= FALSE;
+ bool result;
DBUG_ENTER("mysql_create_table");
DBUG_ASSERT(create_table == thd->lex->query_tables);
+ bzero(&ddl_log_state_create, sizeof(ddl_log_state_create));
+ bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm));
+
/* Copy temporarily the statement flags to thd for lock_table_names() */
- uint save_thd_create_info_options= thd->lex->create_info.options;
+ save_thd_create_info_options= thd->lex->create_info.options;
thd->lex->create_info.options|= create_info->options;
/* Open or obtain an exclusive metadata lock on table being created */
@@ -4569,7 +4620,8 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
/* We can abort create table for any table type */
thd->abort_on_warning= thd->is_strict_mode();
- if (mysql_create_table_no_lock(thd, &create_table->db,
+ if (mysql_create_table_no_lock(thd, &ddl_log_state_create, &ddl_log_state_rm,
+ &create_table->db,
&create_table->table_name, create_info,
alter_info,
&is_trans, create_table_mode,
@@ -4641,10 +4693,19 @@ err:
*/
create_info->table->s->table_creation_was_logged= 1;
}
+ thd->binlog_xid= thd->query_id;
+ ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
+ if (ddl_log_state_rm.is_active())
+ ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
+ debug_crash_here("ddl_log_create_before_binlog");
if (unlikely(write_bin_log(thd, result ? FALSE : TRUE, thd->query(),
thd->query_length(), is_trans)))
result= 1;
+ debug_crash_here("ddl_log_create_after_binlog");
+ thd->binlog_xid= 0;
}
+ ddl_log_complete(&ddl_log_state_rm);
+ ddl_log_complete(&ddl_log_state_create);
DBUG_RETURN(result);
}
@@ -4921,7 +4982,8 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
Table_specification_st local_create_info;
TABLE_LIST *pos_in_locked_tables= 0;
Alter_info local_alter_info;
- Alter_table_ctx local_alter_ctx; // Not used
+ Alter_table_ctx local_alter_ctx; // Not used
+ DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm;
int res= 1;
bool is_trans= FALSE;
bool do_logging= FALSE;
@@ -4930,6 +4992,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
int create_res;
DBUG_ENTER("mysql_create_like_table");
+ bzero(&ddl_log_state_create, sizeof(ddl_log_state_create));
+ bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm));
+
#ifdef WITH_WSREP
if (WSREP(thd) && !thd->wsrep_applier &&
wsrep_create_like_table(thd, table, src_table, create_info))
@@ -5023,7 +5088,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
pos_in_locked_tables= local_create_info.table->pos_in_locked_tables;
res= ((create_res=
- mysql_create_table_no_lock(thd, &table->db, &table->table_name,
+ mysql_create_table_no_lock(thd,
+ &ddl_log_state_create, &ddl_log_state_rm,
+ &table->db, &table->table_name,
&local_create_info, &local_alter_info,
&is_trans, C_ORDINARY_CREATE,
table)) > 0);
@@ -5240,22 +5307,33 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
err:
if (do_logging)
{
+ thd->binlog_xid= thd->query_id;
+ ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
+ if (ddl_log_state_rm.is_active())
+ ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
+ debug_crash_here("ddl_log_create_before_binlog");
if (res && create_info->table_was_deleted)
{
/*
Table was not deleted. Original table was deleted.
We have to log it.
*/
- log_drop_table(thd, &table->db, &table->table_name, create_info->tmp_table());
+ DBUG_ASSERT(ddl_log_state_rm.is_active());
+ log_drop_table(thd, &table->db, &table->table_name,
+ create_info->tmp_table());
}
else if (res != 2) // Table was not dropped
{
if (write_bin_log(thd, res ? FALSE : TRUE, thd->query(),
thd->query_length(), is_trans))
- res= 1;
+ res= 1;
}
+ debug_crash_here("ddl_log_create_after_binlog");
+ thd->binlog_xid= 0;
}
+ ddl_log_complete(&ddl_log_state_rm);
+ ddl_log_complete(&ddl_log_state_create);
DBUG_RETURN(res != 0);
}
@@ -9662,9 +9740,10 @@ do_continue:;
tmp_disable_binlog(thd);
create_info->options|=HA_CREATE_TMP_ALTER;
create_info->alias= alter_ctx.table_name;
- error= create_table_impl(thd, alter_ctx.db, alter_ctx.table_name,
+ error= create_table_impl(thd, (DDL_LOG_STATE*) 0, (DDL_LOG_STATE*) 0,
+ alter_ctx.db, alter_ctx.table_name,
alter_ctx.new_db, alter_ctx.tmp_name,
- alter_ctx.get_tmp_path(),
+ alter_ctx.get_tmp_cstring_path(),
thd->lex->create_info, create_info, alter_info,
C_ALTER_TABLE_FRM_ONLY, NULL,
&key_info, &key_count, &frm);
diff --git a/sql/sql_table.h b/sql/sql_table.h
index a625461f621..0c694eb730c 100644
--- a/sql/sql_table.h
+++ b/sql/sql_table.h
@@ -125,7 +125,10 @@ bool add_keyword_to_query(THD *thd, String *result, const LEX_CSTRING *keyword,
#define C_ALTER_TABLE_FRM_ONLY -2
#define C_ASSISTED_DISCOVERY -3
-int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db,
+int mysql_create_table_no_lock(THD *thd,
+ DDL_LOG_STATE *ddl_log_state,
+ DDL_LOG_STATE *ddl_log_state_rm,
+ const LEX_CSTRING *db,
const LEX_CSTRING *table_name,
Table_specification_st *create_info,
Alter_info *alter_info, bool *is_trans,