diff options
author | Monty <monty@mariadb.org> | 2021-01-17 16:06:43 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2021-05-19 22:54:13 +0200 |
commit | 6aa9a552c213246d4db6cfce78c75fd4b7f32df5 (patch) | |
tree | ebc124b95681bdbf9631a8f014a41aa5c3b54e63 /sql | |
parent | 7a588c30b19d9ee21be2755d70f7ed7fd3676c4b (diff) | |
download | mariadb-git-6aa9a552c213246d4db6cfce78c75fd4b7f32df5.tar.gz |
MDEV-24576 Atomic CREATE TABLE
There are a few different cases to consider
Logging of CREATE TABLE and CREATE TABLE ... LIKE
- If REPLACE is used and there was an existing table, DDL log the drop of
the table.
- If discovery of table is to be done
- DDL LOG create table
else
- DDL log create table (with engine type)
- create the table
- If table was created
- Log entry to binary log with xid
- Mark DDL log completed
Crash recovery:
- If query was in binary log do nothing and exit
- If discoverted table
- Delete the .frm file
-else
- Drop created table and frm file
- If table was dropped, write a DROP TABLE statement in binary log
CREATE TABLE ... SELECT required a little more work as when one is using
statement logging the query is written to the binary log before commit is
done.
This was fixed by adding a DROP TABLE to the binary log during crash
recovery if the ddl log entry was not closed. In this case the binary log
will contain:
CREATE TABLE xxx ... SELECT ....
DROP TABLE xxx;
Other things:
- Added debug_crash_here() functionality to Aria to be able to test
crash in create table between the creation of the .MAI and the .MAD files.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ddl_log.cc | 88 | ||||
-rw-r--r-- | sql/ddl_log.h | 14 | ||||
-rw-r--r-- | sql/handler.cc | 1 | ||||
-rw-r--r-- | sql/sql_alter.h | 6 | ||||
-rw-r--r-- | sql/sql_class.h | 9 | ||||
-rw-r--r-- | sql/sql_insert.cc | 56 | ||||
-rw-r--r-- | sql/sql_table.cc | 143 | ||||
-rw-r--r-- | sql/sql_table.h | 5 |
8 files changed, 282 insertions, 40 deletions
diff --git a/sql/ddl_log.cc b/sql/ddl_log.cc index f6f4a3f43f2..db2c386f800 100644 --- a/sql/ddl_log.cc +++ b/sql/ddl_log.cc @@ -89,7 +89,7 @@ const char *ddl_log_action_name[DDL_LOG_LAST_ACTION]= "partitioning replace", "partitioning exchange", "rename table", "rename view", "initialize drop table", "drop table", - "drop view", "drop trigger", "drop db", + "drop view", "drop trigger", "drop db", "create table", }; /* Number of phases per entry */ @@ -98,7 +98,7 @@ const uchar ddl_log_entry_phases[DDL_LOG_LAST_ACTION]= 0, 1, 1, 2, (uchar) EXCH_PHASE_END, (uchar) DDL_RENAME_PHASE_END, 1, 1, (uchar) DDL_DROP_PHASE_END, 1, 1, - (uchar) DDL_DROP_DB_PHASE_END + (uchar) DDL_DROP_DB_PHASE_END, (uchar) DDL_CREATE_TABLE_PHASE_END }; @@ -1499,6 +1499,59 @@ static int ddl_log_execute_action(THD *thd, MEM_ROOT *mem_root, } break; } + case DDL_LOG_CREATE_TABLE_ACTION: + { + LEX_CSTRING db, table, path; + db= ddl_log_entry->db; + table= ddl_log_entry->name; + path= ddl_log_entry->tmp_name; + + /* Don't delete the table if we didn't create it */ + if (ddl_log_entry->flags == 0) + { + if (hton) + { + if ((error= hton->drop_table(hton, path.str))) + { + if (!non_existing_table_error(error)) + break; + error= -1; + } + } + else + error= ha_delete_table_force(thd, path.str, &db, &table); + } + strxnmov(to_path, sizeof(to_path)-1, path.str, reg_ext, NullS); + mysql_file_delete(key_file_frm, to_path, MYF(MY_WME|MY_IGNORE_ENOENT)); + if (ddl_log_entry->phase == DDL_CREATE_TABLE_PHASE_LOG) + { + /* + The server logged CREATE TABLE ... SELECT into binary log + before crashing. As the commit failed and we have delete the + table above, we have now to log the DROP of the created table. + */ + + String *query= &recovery_state.drop_table; + query->length(0); + query->append(STRING_WITH_LEN("DROP TABLE IF EXISTS ")); + append_identifier(thd, query, &db); + query->append('.'); + append_identifier(thd, query, &table); + query->append(&end_comment); + + if (mysql_bin_log.is_open()) + { + mysql_mutex_unlock(&LOCK_gdl); + (void) thd->binlog_query(THD::STMT_QUERY_TYPE, + query->ptr(), query->length(), + TRUE, FALSE, FALSE, 0); + mysql_mutex_lock(&LOCK_gdl); + } + } + (void) update_phase(entry_pos, DDL_LOG_FINAL_PHASE); + break; + } + break; default: DBUG_ASSERT(0); break; @@ -2417,3 +2470,34 @@ bool ddl_log_drop_db(THD *thd, DDL_LOG_STATE *ddl_state, ddl_log_entry.tmp_name= *const_cast<LEX_CSTRING*>(path); DBUG_RETURN(ddl_log_write(ddl_state, &ddl_log_entry)); } + + +/** + Log CREATE TABLE + + @param only_frm On recovery, only drop the .frm. This is needed for + example when deleting a table that was discovered. +*/ + +bool ddl_log_create_table(THD *thd, DDL_LOG_STATE *ddl_state, + handlerton *hton, + const LEX_CSTRING *path, + const LEX_CSTRING *db, + const LEX_CSTRING *table, + bool only_frm) +{ + DDL_LOG_ENTRY ddl_log_entry; + DBUG_ENTER("ddl_log_create_table"); + + bzero(&ddl_log_entry, sizeof(ddl_log_entry)); + ddl_log_entry.action_type= DDL_LOG_CREATE_TABLE_ACTION; + if (hton) + lex_string_set(&ddl_log_entry.handler_name, + ha_resolve_storage_engine_name(hton)); + ddl_log_entry.db= *const_cast<LEX_CSTRING*>(db); + ddl_log_entry.name= *const_cast<LEX_CSTRING*>(table); + ddl_log_entry.tmp_name= *const_cast<LEX_CSTRING*>(path); + ddl_log_entry.flags= only_frm; + + DBUG_RETURN(ddl_log_write(ddl_state, &ddl_log_entry)); +} diff --git a/sql/ddl_log.h b/sql/ddl_log.h index 5aa039f46a5..368b887b6c5 100644 --- a/sql/ddl_log.h +++ b/sql/ddl_log.h @@ -82,6 +82,7 @@ enum ddl_log_action_code DDL_LOG_DROP_VIEW_ACTION= 9, DDL_LOG_DROP_TRIGGER_ACTION= 10, DDL_LOG_DROP_DB_ACTION=11, + DDL_LOG_CREATE_TABLE_ACTION=12, DDL_LOG_LAST_ACTION /* End marker */ }; @@ -118,6 +119,12 @@ enum enum_ddl_log_drop_db_phase { DDL_DROP_DB_PHASE_END }; +enum enum_ddl_log_create_table_phase { + DDL_CREATE_TABLE_PHASE_INIT=0, + DDL_CREATE_TABLE_PHASE_LOG, + DDL_CREATE_TABLE_PHASE_END +}; + /* Setting ddl_log_entry.phase to this has the same effect as setting the phase to the maximum phase (..PHASE_END) for an entry. @@ -185,6 +192,7 @@ typedef struct st_ddl_log_state DDL_LOG_MEMORY_ENTRY *list; /* One execute entry per list */ DDL_LOG_MEMORY_ENTRY *execute_entry; + bool is_active() { return list != 0; } } DDL_LOG_STATE; @@ -252,5 +260,11 @@ bool ddl_log_drop_view(THD *thd, DDL_LOG_STATE *ddl_state, const LEX_CSTRING *db); bool ddl_log_drop_db(THD *thd, DDL_LOG_STATE *ddl_state, const LEX_CSTRING *db, const LEX_CSTRING *path); +bool ddl_log_create_table(THD *thd, DDL_LOG_STATE *ddl_state, + handlerton *hton, + const LEX_CSTRING *path, + const LEX_CSTRING *db, + const LEX_CSTRING *table, + bool only_frm); extern mysql_mutex_t LOCK_gdl; #endif /* DDL_LOG_INCLUDED */ diff --git a/sql/handler.cc b/sql/handler.cc index 0f2533e6f11..82c0f11905a 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -4563,6 +4563,7 @@ bool non_existing_table_error(int error) { return (error == ENOENT || (error == EE_DELETE && my_errno == ENOENT) || + error == EE_FILENOTFOUND || error == HA_ERR_NO_SUCH_TABLE || error == HA_ERR_UNSUPPORTED || error == ER_NO_SUCH_TABLE || diff --git a/sql/sql_alter.h b/sql/sql_alter.h index 1c98ac1651d..b922cbf0637 100644 --- a/sql/sql_alter.h +++ b/sql/sql_alter.h @@ -294,6 +294,12 @@ public: const char *get_tmp_path() const { return tmp_path; } + const LEX_CSTRING get_tmp_cstring_path() const + { + LEX_CSTRING tmp= { tmp_path, strlen(tmp_path) }; + return tmp; + }; + /** Mark ALTER TABLE as needing to produce foreign key error if it deletes a row from the table being changed. diff --git a/sql/sql_class.h b/sql/sql_class.h index 77861611794..2df0651886f 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -39,7 +39,7 @@ #include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA, THR_LOCK_INFO */ #include "thr_timer.h" #include "thr_malloc.h" -#include "log_slow.h" /* LOG_SLOW_DISABLE_... */ +#include "log_slow.h" /* LOG_SLOW_DISABLE_... */ #include <my_tree.h> #include "sql_digest_stream.h" // sql_digest_state #include <mysql/psi/mysql_stage.h> @@ -50,6 +50,7 @@ #include "session_tracker.h" #include "backup.h" #include "xa.h" +#include "ddl_log.h" /* DDL_LOG_STATE */ extern "C" void set_thd_stage_info(void *thd, @@ -6027,6 +6028,7 @@ class select_create: public select_insert { MYSQL_LOCK **m_plock; bool exit_done; TMP_TABLE_SHARE *saved_tmp_table_share; + DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm; public: select_create(THD *thd_arg, TABLE_LIST *table_arg, @@ -6042,7 +6044,10 @@ public: alter_info(alter_info_arg), m_plock(NULL), exit_done(0), saved_tmp_table_share(0) - {} + { + bzero(&ddl_log_state_create, sizeof(ddl_log_state_create)); + bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm)); + } int prepare(List<Item> &list, SELECT_LEX_UNIT *u); void store_values(List<Item> &values); diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 52d76950dd0..8dde0ec8989 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -4375,7 +4375,8 @@ Field *Item::create_field_for_create_select(MEM_ROOT *root, TABLE *table) */ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, - MYSQL_LOCK **lock, TABLEOP_HOOKS *hooks) + MYSQL_LOCK **lock, + TABLEOP_HOOKS *hooks) { TABLE tmp_table; // Used during 'Create_field()' TABLE_SHARE share; @@ -4473,7 +4474,8 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, open_table(). */ - if (!mysql_create_table_no_lock(thd, &create_table->db, + if (!mysql_create_table_no_lock(thd, &ddl_log_state_create, &ddl_log_state_rm, + &create_table->db, &create_table->table_name, create_info, alter_info, NULL, select_field_count, create_table)) @@ -4532,6 +4534,8 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, { if (likely(!thd->is_error())) // CREATE ... IF NOT EXISTS my_ok(thd); // succeed, but did nothing + ddl_log_complete(&ddl_log_state_rm); + ddl_log_complete(&ddl_log_state_create); DBUG_RETURN(NULL); } @@ -4570,7 +4574,9 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, *lock= 0; } drop_open_table(thd, table, &create_table->db, &create_table->table_name); - DBUG_RETURN(0); + ddl_log_complete(&ddl_log_state_rm); + ddl_log_complete(&ddl_log_state_create); + DBUG_RETURN(NULL); /* purecov: end */ } table->s->table_creation_was_logged= save_table_creation_was_logged; @@ -4912,11 +4918,30 @@ bool select_create::send_eof() if (thd->slave_thread) thd->variables.binlog_annotate_row_events= 0; + debug_crash_here("ddl_log_create_before_binlog"); + + /* + In case of crash, we have to add DROP TABLE to the binary log as + the CREATE TABLE will already be logged if we are not using row based + replication. + */ + if (!thd->is_current_stmt_binlog_format_row()) + { + if (ddl_log_state_create.is_active()) // Not temporary table + ddl_log_update_phase(&ddl_log_state_create, DDL_CREATE_TABLE_PHASE_LOG); + /* + We can ignore if we replaced an old table as ddl_log_state_create will + now handle the logging of the drop if needed. + */ + ddl_log_complete(&ddl_log_state_rm); + } + if (prepare_eof()) { abort_result_set(); DBUG_RETURN(true); } + debug_crash_here("ddl_log_create_after_prepare_eof"); if (table->s->tmp_table) { @@ -4983,9 +5008,15 @@ bool select_create::send_eof() thd->get_stmt_da()->set_overwrite_status(true); } #endif /* WITH_WSREP */ + thd->binlog_xid= thd->query_id; + /* Remember xid's for the case of row based logging */ + ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid); + ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid); trans_commit_stmt(thd); if (!(thd->variables.option_bits & OPTION_GTID_BEGIN)) trans_commit_implicit(thd); + thd->binlog_xid= 0; + #ifdef WITH_WSREP if (WSREP(thd)) { @@ -5004,6 +5035,15 @@ bool select_create::send_eof() } #endif /* WITH_WSREP */ } + /* + If are using statement based replication the table will be deleted here + in case of a crash as we can't use xid to check if the query was logged + (as the query was logged before commit!) + */ + debug_crash_here("ddl_log_create_after_binlog"); + ddl_log_complete(&ddl_log_state_rm); + ddl_log_complete(&ddl_log_state_create); + debug_crash_here("ddl_log_create_log_complete"); /* exit_done must only be set after last potential call to @@ -5120,9 +5160,19 @@ void select_create::abort_result_set() binlog_reset_cache(thd); /* Original table was deleted. We have to log it */ if (table_creation_was_logged) + { + thd->binlog_xid= thd->query_id; + ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid); + ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid); + debug_crash_here("ddl_log_create_before_binlog"); log_drop_table(thd, &create_table->db, &create_table->table_name, tmp_table); + debug_crash_here("ddl_log_create_after_binlog"); + thd->binlog_xid= 0; + } } } + ddl_log_complete(&ddl_log_state_rm); + ddl_log_complete(&ddl_log_state_create); DBUG_VOID_RETURN; } diff --git a/sql/sql_table.cc b/sql/sql_table.cc index 12d387bc817..f00aaaee00a 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1078,6 +1078,7 @@ static uint32 get_comment(THD *thd, uint32 comment_pos, const uchar *query_end= (uchar*) query + thd->query_length(); const uchar *const state_map= thd->charset()->state_map; + *comment_start= ""; // Ensure it points to something for (; query < query_end; query++) { if (state_map[static_cast<uchar>(*query)] == MY_LEX_SKIP) @@ -4131,10 +4132,13 @@ err: */ static -int create_table_impl(THD *thd, const LEX_CSTRING &orig_db, +int create_table_impl(THD *thd, + DDL_LOG_STATE *ddl_log_state_create, + DDL_LOG_STATE *ddl_log_state_rm, + const LEX_CSTRING &orig_db, const LEX_CSTRING &orig_table_name, const LEX_CSTRING &db, const LEX_CSTRING &table_name, - const char *path, const DDL_options_st options, + const LEX_CSTRING &path, const DDL_options_st options, HA_CREATE_INFO *create_info, Alter_info *alter_info, int create_table_mode, bool *is_trans, KEY **key_info, uint *key_count, LEX_CUSTRING *frm) @@ -4145,9 +4149,16 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db, bool frm_only= create_table_mode == C_ALTER_TABLE_FRM_ONLY; bool internal_tmp_table= create_table_mode == C_ALTER_TABLE || frm_only; handlerton *exists_hton; - DBUG_ENTER("mysql_create_table_no_lock"); + DBUG_ENTER("create_table_impl"); DBUG_PRINT("enter", ("db: '%s' table: '%s' tmp: %d path: %s", - db.str, table_name.str, internal_tmp_table, path)); + db.str, table_name.str, internal_tmp_table, path.str)); + + /* Easy check for ddl logging if we are creating a temporary table */ + if (create_info->tmp_table()) + { + ddl_log_state_create= 0; + ddl_log_state_rm= 0; + } if (fix_constraints_names(thd, &alter_info->check_constraint_list, create_info)) @@ -4261,10 +4272,12 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db, (void) trans_rollback_stmt(thd); /* Remove normal table without logging. Keep tables locked */ if (mysql_rm_table_no_locks(thd, &table_list, &thd->db, - (DDL_LOG_STATE*) 0, + ddl_log_state_rm, 0, 0, 0, 0, 1, 1)) goto err; + debug_crash_here("ddl_log_create_after_drop"); + /* We have to log this query, even if it failed later to ensure the drop is done. @@ -4327,7 +4340,7 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db, goto err; } - init_tmp_table_share(thd, &share, db.str, 0, table_name.str, path); + init_tmp_table_share(thd, &share, db.str, 0, table_name.str, path.str); /* prepare everything for discovery */ share.field= &no_fields; @@ -4338,6 +4351,14 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db, if (parse_engine_table_options(thd, hton, &share)) goto err; + /* + Log that we are going to do discovery. If things fails, any generated + .frm files are deleted + */ + if (ddl_log_state_create) + ddl_log_create_table(thd, ddl_log_state_create, (handlerton*) 0, &path, + &db, &table_name, 1); + ha_err= hton->discover_table_structure(hton, thd, &share, create_info); /* @@ -4359,45 +4380,56 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db, } else { + if (ddl_log_state_create) + ddl_log_create_table(thd, ddl_log_state_create, create_info->db_type, + &path, &db, &table_name, frm_only); + debug_crash_here("ddl_log_create_before_create_frm"); + file= mysql_create_frm_image(thd, orig_db, orig_table_name, create_info, alter_info, create_table_mode, key_info, key_count, frm); /* - TODO: remove this check of thd->is_error() (now it intercept - errors in some val_*() methoids and bring some single place to - such error interception). + TODO: remove this check of thd->is_error() (now it intercept + errors in some val_*() methods and bring some single place to + such error interception). */ if (!file || thd->is_error()) + { + if (!file) + deletefrm(path.str); goto err; + } if (thd->variables.keep_files_on_create) create_info->options|= HA_CREATE_KEEP_FILES; - if (file->ha_create_partitioning_metadata(path, NULL, CHF_CREATE_FLAG)) + if (file->ha_create_partitioning_metadata(path.str, NULL, CHF_CREATE_FLAG)) goto err; if (!frm_only) { - if (ha_create_table(thd, path, db.str, table_name.str, create_info, + debug_crash_here("ddl_log_create_before_create_table"); + if (ha_create_table(thd, path.str, db.str, table_name.str, create_info, frm, 0)) { - file->ha_create_partitioning_metadata(path, NULL, CHF_DELETE_FLAG); - deletefrm(path); + file->ha_create_partitioning_metadata(path.str, NULL, CHF_DELETE_FLAG); + deletefrm(path.str); goto err; } + debug_crash_here("ddl_log_create_after_create_table"); } } create_info->table= 0; if (!frm_only && create_info->tmp_table()) { - TABLE *table= thd->create_and_open_tmp_table(frm, path, db.str, + TABLE *table= thd->create_and_open_tmp_table(frm, path.str, db.str, table_name.str, false); if (!table) { - (void) thd->rm_temporary_table(create_info->db_type, path); + (void) thd->rm_temporary_table(create_info->db_type, path.str); goto err; } @@ -4410,6 +4442,12 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db, error= 0; err: + if (unlikely(error) && ddl_log_state_create) + { + /* Table was never created, so we can ignore the ddl log entry */ + ddl_log_complete(ddl_log_state_create); + } + THD_STAGE_INFO(thd, stage_after_create); delete file; DBUG_PRINT("exit", ("return: %d", error)); @@ -4435,7 +4473,10 @@ warn: -1 Table was used with IF NOT EXISTS and table existed (warning, not error) */ -int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db, +int mysql_create_table_no_lock(THD *thd, + DDL_LOG_STATE *ddl_log_state_create, + DDL_LOG_STATE *ddl_log_state_rm, + const LEX_CSTRING *db, const LEX_CSTRING *table_name, Table_specification_st *create_info, Alter_info *alter_info, bool *is_trans, @@ -4444,26 +4485,31 @@ int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db, KEY *not_used_1; uint not_used_2; int res; + uint path_length; char path[FN_REFLEN + 1]; + LEX_CSTRING cpath; LEX_CUSTRING frm= {0,0}; if (create_info->tmp_table()) - build_tmptable_filename(thd, path, sizeof(path)); + path_length= build_tmptable_filename(thd, path, sizeof(path)); else { - int length; const LEX_CSTRING *alias= table_case_name(create_info, table_name); - length= build_table_filename(path, sizeof(path) - 1, db->str, alias->str, "", 0); + path_length= build_table_filename(path, sizeof(path) - 1, db->str, + alias->str, + "", 0); // Check if we hit FN_REFLEN bytes along with file extension. - if (length+reg_ext_length > FN_REFLEN) + if (path_length+reg_ext_length > FN_REFLEN) { my_error(ER_IDENT_CAUSES_TOO_LONG_PATH, MYF(0), (int) sizeof(path)-1, path); return true; } } + lex_string_set3(&cpath, path, path_length); - res= create_table_impl(thd, *db, *table_name, *db, *table_name, path, + res= create_table_impl(thd, ddl_log_state_create, ddl_log_state_rm, + *db, *table_name, *db, *table_name, cpath, *create_info, create_info, alter_info, create_table_mode, is_trans, ¬_used_1, ¬_used_2, &frm); @@ -4516,17 +4562,22 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table, Table_specification_st *create_info, Alter_info *alter_info) { - bool is_trans= FALSE; - bool result; - int create_table_mode; TABLE_LIST *pos_in_locked_tables= 0; MDL_ticket *mdl_ticket= 0; + DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm; + int create_table_mode; + uint save_thd_create_info_options; + bool is_trans= FALSE; + bool result; DBUG_ENTER("mysql_create_table"); DBUG_ASSERT(create_table == thd->lex->query_tables); + bzero(&ddl_log_state_create, sizeof(ddl_log_state_create)); + bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm)); + /* Copy temporarily the statement flags to thd for lock_table_names() */ - uint save_thd_create_info_options= thd->lex->create_info.options; + save_thd_create_info_options= thd->lex->create_info.options; thd->lex->create_info.options|= create_info->options; /* Open or obtain an exclusive metadata lock on table being created */ @@ -4569,7 +4620,8 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table, /* We can abort create table for any table type */ thd->abort_on_warning= thd->is_strict_mode(); - if (mysql_create_table_no_lock(thd, &create_table->db, + if (mysql_create_table_no_lock(thd, &ddl_log_state_create, &ddl_log_state_rm, + &create_table->db, &create_table->table_name, create_info, alter_info, &is_trans, create_table_mode, @@ -4641,10 +4693,19 @@ err: */ create_info->table->s->table_creation_was_logged= 1; } + thd->binlog_xid= thd->query_id; + ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid); + if (ddl_log_state_rm.is_active()) + ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid); + debug_crash_here("ddl_log_create_before_binlog"); if (unlikely(write_bin_log(thd, result ? FALSE : TRUE, thd->query(), thd->query_length(), is_trans))) result= 1; + debug_crash_here("ddl_log_create_after_binlog"); + thd->binlog_xid= 0; } + ddl_log_complete(&ddl_log_state_rm); + ddl_log_complete(&ddl_log_state_create); DBUG_RETURN(result); } @@ -4921,7 +4982,8 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table, Table_specification_st local_create_info; TABLE_LIST *pos_in_locked_tables= 0; Alter_info local_alter_info; - Alter_table_ctx local_alter_ctx; // Not used + Alter_table_ctx local_alter_ctx; // Not used + DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm; int res= 1; bool is_trans= FALSE; bool do_logging= FALSE; @@ -4930,6 +4992,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table, int create_res; DBUG_ENTER("mysql_create_like_table"); + bzero(&ddl_log_state_create, sizeof(ddl_log_state_create)); + bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm)); + #ifdef WITH_WSREP if (WSREP(thd) && !thd->wsrep_applier && wsrep_create_like_table(thd, table, src_table, create_info)) @@ -5023,7 +5088,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table, pos_in_locked_tables= local_create_info.table->pos_in_locked_tables; res= ((create_res= - mysql_create_table_no_lock(thd, &table->db, &table->table_name, + mysql_create_table_no_lock(thd, + &ddl_log_state_create, &ddl_log_state_rm, + &table->db, &table->table_name, &local_create_info, &local_alter_info, &is_trans, C_ORDINARY_CREATE, table)) > 0); @@ -5240,22 +5307,33 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table, err: if (do_logging) { + thd->binlog_xid= thd->query_id; + ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid); + if (ddl_log_state_rm.is_active()) + ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid); + debug_crash_here("ddl_log_create_before_binlog"); if (res && create_info->table_was_deleted) { /* Table was not deleted. Original table was deleted. We have to log it. */ - log_drop_table(thd, &table->db, &table->table_name, create_info->tmp_table()); + DBUG_ASSERT(ddl_log_state_rm.is_active()); + log_drop_table(thd, &table->db, &table->table_name, + create_info->tmp_table()); } else if (res != 2) // Table was not dropped { if (write_bin_log(thd, res ? FALSE : TRUE, thd->query(), thd->query_length(), is_trans)) - res= 1; + res= 1; } + debug_crash_here("ddl_log_create_after_binlog"); + thd->binlog_xid= 0; } + ddl_log_complete(&ddl_log_state_rm); + ddl_log_complete(&ddl_log_state_create); DBUG_RETURN(res != 0); } @@ -9662,9 +9740,10 @@ do_continue:; tmp_disable_binlog(thd); create_info->options|=HA_CREATE_TMP_ALTER; create_info->alias= alter_ctx.table_name; - error= create_table_impl(thd, alter_ctx.db, alter_ctx.table_name, + error= create_table_impl(thd, (DDL_LOG_STATE*) 0, (DDL_LOG_STATE*) 0, + alter_ctx.db, alter_ctx.table_name, alter_ctx.new_db, alter_ctx.tmp_name, - alter_ctx.get_tmp_path(), + alter_ctx.get_tmp_cstring_path(), thd->lex->create_info, create_info, alter_info, C_ALTER_TABLE_FRM_ONLY, NULL, &key_info, &key_count, &frm); diff --git a/sql/sql_table.h b/sql/sql_table.h index a625461f621..0c694eb730c 100644 --- a/sql/sql_table.h +++ b/sql/sql_table.h @@ -125,7 +125,10 @@ bool add_keyword_to_query(THD *thd, String *result, const LEX_CSTRING *keyword, #define C_ALTER_TABLE_FRM_ONLY -2 #define C_ASSISTED_DISCOVERY -3 -int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db, +int mysql_create_table_no_lock(THD *thd, + DDL_LOG_STATE *ddl_log_state, + DDL_LOG_STATE *ddl_log_state_rm, + const LEX_CSTRING *db, const LEX_CSTRING *table_name, Table_specification_st *create_info, Alter_info *alter_info, bool *is_trans, |