diff options
Diffstat (limited to 'sql/sql_insert.cc')
| -rw-r--r-- | sql/sql_insert.cc | 327 |
1 files changed, 178 insertions, 149 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 9db6acf73f8..c969725bea4 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -82,6 +82,10 @@ #include "debug_sync.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" /* wsrep_start_transction() */ +#endif /* WITH_WSREP */ + #ifndef EMBEDDED_LIBRARY static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, TABLE_LIST *table_list); @@ -241,7 +245,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list, } else { // Part field list - SELECT_LEX *select_lex= &thd->lex->select_lex; + SELECT_LEX *select_lex= thd->lex->first_select_lex(); Name_resolution_context *context= &select_lex->context; Name_resolution_context_state ctx_state; int res; @@ -273,7 +277,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list, /* Restore the current context. */ ctx_state.restore_state(context, table_list); - thd->lex->select_lex.no_wrap_view_item= FALSE; + thd->lex->first_select_lex()->no_wrap_view_item= FALSE; if (res) DBUG_RETURN(-1); @@ -547,10 +551,10 @@ bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list) If this goes ok, the tickets are cloned and added to the list of granted locks held by the handler thread. */ - if (thd->global_read_lock.can_acquire_protection()) + if (thd->has_read_only_protection()) DBUG_RETURN(TRUE); - protection_request.init(MDL_key::GLOBAL, "", "", MDL_INTENTION_EXCLUSIVE, + protection_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_DML, MDL_STATEMENT); if (thd->mdl_context.acquire_lock(&protection_request, @@ -640,7 +644,7 @@ create_insert_stmt_from_insert_delayed(THD *thd, String *buf) if (buf->append(thd->query()) || buf->replace(thd->lex->keyword_delayed_begin_offset, thd->lex->keyword_delayed_end_offset - - thd->lex->keyword_delayed_begin_offset, 0)) + thd->lex->keyword_delayed_begin_offset, NULL, 0)) return 1; return 0; } @@ -657,7 +661,7 @@ static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list) bool skip= MY_TEST(table_list->view); /* Save subquery children */ - for (SELECT_LEX_UNIT *unit= thd->lex->select_lex.first_inner_unit(); + for (SELECT_LEX_UNIT *unit= thd->lex->first_select_lex()->first_inner_unit(); unit; unit= unit->next_unit()) { @@ -777,7 +781,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, /* mysql_prepare_insert sets table_list->table if it was not set */ table= table_list->table; - context= &thd->lex->select_lex.context; + context= &thd->lex->first_select_lex()->context; /* These three asserts test the hypothesis that the resetting of the name resolution context below is not necessary at all since the list of local @@ -900,6 +904,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, using_bulk_insert= 1; table->file->ha_start_bulk_insert(values_list.elements); } + else + table->file->ha_reset_copy_info(); } thd->abort_on_warning= !ignore && thd->is_strict_mode(); @@ -1079,7 +1085,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, } while (bulk_parameters_iterations(thd)); values_loop_end: - free_underlaid_joins(thd, &thd->lex->select_lex); + free_underlaid_joins(thd, thd->lex->first_select_lex()); joins_freed= TRUE; /* @@ -1103,11 +1109,23 @@ values_loop_end: auto_inc values from the delayed_insert thread as they share TABLE. */ table->file->ha_release_auto_increment(); - if (using_bulk_insert && unlikely(table->file->ha_end_bulk_insert()) && - !error) + if (using_bulk_insert) + { + if (unlikely(table->file->ha_end_bulk_insert()) && + !error) + { + table->file->print_error(my_errno,MYF(0)); + error=1; + } + } + /* Get better status from handler if handler supports it */ + if (table->file->copy_info.records) { - table->file->print_error(my_errno,MYF(0)); - error=1; + DBUG_ASSERT(info.copied >= table->file->copy_info.copied); + info.touched= table->file->copy_info.touched; + info.copied= table->file->copy_info.copied; + info.deleted= table->file->copy_info.deleted; + info.updated= table->file->copy_info.updated; } if (duplic != DUP_ERROR || ignore) { @@ -1230,8 +1248,12 @@ values_loop_end: retval= thd->lex->explain->send_explain(thd); goto abort; } - if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) || - !thd->cuted_fields)) + DBUG_PRINT("info", ("touched: %llu copied: %llu updated: %llu deleted: %llu", + (ulonglong) info.touched, (ulonglong) info.copied, + (ulonglong) info.updated, (ulonglong) info.deleted)); + + if ((iteration * values_list.elements) == 1 && + (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields)) { my_ok(thd, info.copied + info.deleted + ((thd->client_capabilities & CLIENT_FOUND_ROWS) ? @@ -1272,7 +1294,7 @@ abort: table->file->ha_release_auto_increment(); if (!joins_freed) - free_underlaid_joins(thd, &thd->lex->select_lex); + free_underlaid_joins(thd, thd->lex->first_select_lex()); thd->abort_on_warning= 0; DBUG_RETURN(retval); } @@ -1302,7 +1324,7 @@ abort: static bool check_view_insertability(THD * thd, TABLE_LIST *view) { - uint num= view->view->select_lex.item_list.elements; + uint num= view->view->first_select_lex()->item_list.elements; TABLE *table= view->table; Field_translator *trans_start= view->field_translation, *trans_end= trans_start + num; @@ -1402,10 +1424,12 @@ static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list, than INSERT. */ - if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context, - &thd->lex->select_lex.top_join_list, + if (setup_tables_and_check_access(thd, + &thd->lex->first_select_lex()->context, + &thd->lex->first_select_lex()-> + top_join_list, table_list, - thd->lex->select_lex.leaf_tables, + thd->lex->first_select_lex()->leaf_tables, select_insert, INSERT_ACL, SELECT_ACL, TRUE)) DBUG_RETURN(TRUE); @@ -1413,7 +1437,7 @@ static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list, if (insert_into_view && !fields.elements) { thd->lex->empty_field_list_on_rset= 1; - if (!thd->lex->select_lex.leaf_tables.head()->table || + if (!thd->lex->first_select_lex()->leaf_tables.head()->table || table_list->is_multitable()) { my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0), @@ -1487,7 +1511,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, enum_duplicates duplic, COND **where, bool select_insert) { - SELECT_LEX *select_lex= &thd->lex->select_lex; + SELECT_LEX *select_lex= thd->lex->first_select_lex(); Name_resolution_context *context= &select_lex->context; Name_resolution_context_state ctx_state; bool insert_into_view= (table_list->view != 0); @@ -1737,7 +1761,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) */ if (info->ignore) { - table->file->print_error(error, MYF(ME_JUST_WARNING)); + table->file->print_error(error, MYF(ME_WARNING)); goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */ } goto err; @@ -1756,10 +1780,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) was used. This ensures that we don't get a problem when the whole range of the key has been used. */ - if (info->handle_duplicates == DUP_REPLACE && - table->next_number_field && - key_nr == table->s->next_number_index && - (insert_id_for_cur_row > 0)) + if (info->handle_duplicates == DUP_REPLACE && table->next_number_field && + key_nr == table->s->next_number_index && insert_id_for_cur_row > 0) goto err; if (table->file->ha_table_flags() & HA_DUPLICATE_POS) { @@ -1861,7 +1883,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) { if (!(thd->variables.old_behavior & OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) - table->file->print_error(error, MYF(ME_JUST_WARNING)); + table->file->print_error(error, MYF(ME_WARNING)); goto ok_or_after_trg_err; } goto err; @@ -2043,7 +2065,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) goto err; if (!(thd->variables.old_behavior & OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) - table->file->print_error(error, MYF(ME_JUST_WARNING)); + table->file->print_error(error, MYF(ME_WARNING)); table->file->restore_auto_increment(prev_insert_id); goto ok_or_after_trg_err; } @@ -2205,11 +2227,11 @@ public: mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST); mysql_cond_init(key_delayed_insert_cond, &cond, NULL); mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL); - mysql_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_delayed_insert); delayed_insert_threads++; + mysql_mutex_unlock(&LOCK_delayed_insert); delayed_lock= global_system_variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; - mysql_mutex_unlock(&LOCK_thread_count); DBUG_VOID_RETURN; } ~Delayed_insert() @@ -2221,21 +2243,15 @@ public: if (table) { close_thread_tables(&thd); - thd.mdl_context.release_transactional_locks(); + thd.mdl_context.release_transactional_locks(&thd); } mysql_mutex_destroy(&mutex); mysql_cond_destroy(&cond); mysql_cond_destroy(&cond_client); - /* - We could use unlink_not_visible_threads() here, but as - delayed_insert_threads also needs to be protected by - the LOCK_thread_count mutex, we open code this. - */ - mysql_mutex_lock(&LOCK_thread_count); - thd.unlink(); // Must be unlinked under lock + server_threads.erase(&thd); + mysql_mutex_assert_owner(&LOCK_delayed_insert); delayed_insert_threads--; - mysql_mutex_unlock(&LOCK_thread_count); my_free(thd.query()); thd.security_ctx->user= 0; @@ -2382,7 +2398,7 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, di->thd.set_db(&table_list->db); di->thd.set_query(my_strndup(table_list->table_name.str, table_list->table_name.length, - MYF(MY_WME | ME_FATALERROR)), + MYF(MY_WME | ME_FATAL)), table_list->table_name.length, system_charset_info); if (di->thd.db.str == NULL || di->thd.query() == NULL) { @@ -2395,9 +2411,12 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, di->table_list.alias.str= di->table_list.table_name.str= di->thd.query(); di->table_list.alias.length= di->table_list.table_name.length= di->thd.query_length(); di->table_list.db= di->thd.db; - /* We need the tickets so that they can be cloned in handle_delayed_insert */ - di->grl_protection.init(MDL_key::GLOBAL, "", "", - MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT); + /* + We need the tickets so that they can be cloned in + handle_delayed_insert + */ + di->grl_protection.init(MDL_key::BACKUP, "", "", + MDL_BACKUP_DML, MDL_STATEMENT); di->grl_protection.ticket= grl_protection_request->ticket; init_mdl_requests(&di->table_list); di->table_list.mdl_request.ticket= table_list->mdl_request.ticket; @@ -2414,7 +2433,7 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, mysql_mutex_unlock(&di->mutex); di->unlock(); delete di; - my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATALERROR), error); + my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATAL), error); goto end_create; } @@ -2472,10 +2491,12 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, } /* Unlock the delayed insert object after its last access. */ di->unlock(); - DBUG_RETURN((table_list->table == NULL)); + DBUG_PRINT("exit", ("table_list->table: %p", table_list->table)); + DBUG_RETURN(thd->is_error()); end_create: mysql_mutex_unlock(&LOCK_delayed_create); + DBUG_PRINT("exit", ("is_error: %d", thd->is_error())); DBUG_RETURN(thd->is_error()); } @@ -2530,24 +2551,27 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) if (thd.killed) { /* - Copy the error message. Note that we don't treat fatal - errors in the delayed thread as fatal errors in the - main thread. If delayed thread was killed, we don't - want to send "Server shutdown in progress" in the - INSERT THREAD. - - The thread could be killed with an error message if - di->handle_inserts() or di->open_and_lock_table() fails. - The thread could be killed without an error message if - killed using THD::notify_shared_lock() or - kill_delayed_threads_for_table(). + Check how the insert thread was killed. If it was killed + by FLUSH TABLES which calls kill_delayed_threads_for_table(), + then is_error is not set. + In this case, return without setting an error, + which means that the insert will be converted to a normal insert. */ - if (!thd.is_error()) - my_message(ER_QUERY_INTERRUPTED, ER_THD(&thd, ER_QUERY_INTERRUPTED), - MYF(0)); - else + if (thd.is_error()) + { + /* + Copy the error message. Note that we don't treat fatal + errors in the delayed thread as fatal errors in the + main thread. If delayed thread was killed, we don't + want to send "Server shutdown in progress" in the + INSERT THREAD. + + The thread could be killed with an error message if + di->handle_inserts() or di->open_and_lock_table() fails. + */ my_message(thd.get_stmt_da()->sql_errno(), thd.get_stmt_da()->message(), MYF(0)); + } goto error; } } @@ -2629,10 +2653,6 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) share->default_fields) { bool error_reported= FALSE; - if (unlikely(!(copy->def_vcol_set= - (MY_BITMAP*) alloc_root(client_thd->mem_root, - sizeof(MY_BITMAP))))) - goto error; if (unlikely(parse_vcol_defs(client_thd, client_thd->mem_root, copy, &error_reported, VCOL_INIT_DEPENDENCY_FAILURE_IS_WARNING))) @@ -2652,15 +2672,6 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) copy->def_write_set.bitmap= ((my_bitmap_map*) (bitmap + share->column_bitmap_size)); bitmaps_used= 2; - if (share->virtual_fields) - { - my_bitmap_init(copy->def_vcol_set, - (my_bitmap_map*) (bitmap + - bitmaps_used*share->column_bitmap_size), - share->fields, FALSE); - bitmaps_used++; - copy->vcol_set= copy->def_vcol_set; - } if (share->default_fields || share->default_expressions) { my_bitmap_init(©->has_value_set, @@ -2858,23 +2869,7 @@ void kill_delayed_threads(void) mysql_mutex_lock(&di->thd.LOCK_thd_kill); if (di->thd.killed < KILL_CONNECTION) di->thd.set_killed_no_mutex(KILL_CONNECTION); - if (di->thd.mysys_var) - { - mysql_mutex_lock(&di->thd.mysys_var->mutex); - if (di->thd.mysys_var->current_cond) - { - /* - We need the following test because the main mutex may be locked - in handle_delayed_insert() - */ - if (&di->mutex != di->thd.mysys_var->current_mutex) - mysql_mutex_lock(di->thd.mysys_var->current_mutex); - mysql_cond_broadcast(di->thd.mysys_var->current_cond); - if (&di->mutex != di->thd.mysys_var->current_mutex) - mysql_mutex_unlock(di->thd.mysys_var->current_mutex); - } - mysql_mutex_unlock(&di->thd.mysys_var->mutex); - } + di->thd.abort_current_cond_wait(false); mysql_mutex_unlock(&di->thd.LOCK_thd_kill); } mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list @@ -2999,7 +2994,7 @@ pthread_handler_t handle_delayed_insert(void *arg) pthread_detach_this_thread(); /* Add thread to THD list so that's it's visible in 'show processlist' */ thd->set_start_time(); - add_to_active_threads(thd); + server_threads.insert(thd); if (abort_loop) thd->set_killed(KILL_CONNECTION); else @@ -3157,11 +3152,30 @@ pthread_handler_t handle_delayed_insert(void *arg) mysql_mutex_unlock(&di->thd.mysys_var->mutex); mysql_mutex_lock(&di->mutex); } + + /* + The code depends on that the following ASSERT always hold. + I don't want to accidently introduce and bugs in the following code + in this commit, so I leave the small cleaning up of the code to + a future commit + */ + DBUG_ASSERT(thd->lock || di->stacked_inserts == 0); + DBUG_PRINT("delayed", - ("thd->killed: %d di->tables_in_use: %d thd->lock: %d", - thd->killed, di->tables_in_use, thd->lock != 0)); + ("thd->killed: %d di->status: %d di->stacked_insert: %d di->tables_in_use: %d thd->lock: %d", + thd->killed, di->status, di->stacked_inserts, di->tables_in_use, thd->lock != 0)); - if (di->tables_in_use && ! thd->lock && !thd->killed) + /* + This is used to test see what happens if killed is sent before + we have time to handle the insert requests. + */ + DBUG_EXECUTE_IF("write_delay_wakeup", + if (!thd->killed && di->stacked_inserts) + my_sleep(500000); + ); + + if (di->tables_in_use && ! thd->lock && + (!thd->killed || di->stacked_inserts)) { /* Request for new delayed insert. @@ -3335,7 +3349,7 @@ bool Delayed_insert::handle_inserts(void) or if another thread is removing the current table definition from the table cache. */ - my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATALERROR | ME_NOREFRESH), + my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATAL | ME_ERROR_LOG), table->s->table_name.str); goto err; } @@ -3511,7 +3525,7 @@ bool Delayed_insert::handle_inserts(void) { /* This is not known to happen. */ my_error(ER_DELAYED_CANT_CHANGE_LOCK, - MYF(ME_FATALERROR | ME_NOREFRESH), + MYF(ME_FATAL | ME_ERROR_LOG), table->s->table_name.str); goto err; } @@ -3609,7 +3623,7 @@ bool Delayed_insert::handle_inserts(void) bool mysql_insert_select_prepare(THD *thd) { LEX *lex= thd->lex; - SELECT_LEX *select_lex= &lex->select_lex; + SELECT_LEX *select_lex= lex->first_select_lex(); DBUG_ENTER("mysql_insert_select_prepare"); @@ -3698,7 +3712,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) select, LEX::current_select should point to the first select while we are fixing fields from insert list. */ - lex->current_select= &lex->select_lex; + lex->current_select= lex->first_select_lex(); res= (setup_fields(thd, Ref_ptr_array(), values, MARK_COLUMNS_READ, 0, NULL, 0) || @@ -3715,7 +3729,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) if (info.handle_duplicates == DUP_UPDATE && !res) { - Name_resolution_context *context= &lex->select_lex.context; + Name_resolution_context *context= &lex->first_select_lex()->context; Name_resolution_context_state ctx_state; /* Save the state of the current name resolution context. */ @@ -3725,7 +3739,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) table_list->next_local= 0; context->resolve_in_table_list_only(table_list); - lex->select_lex.no_wrap_view_item= TRUE; + lex->first_select_lex()->no_wrap_view_item= TRUE; res= res || check_update_fields(thd, context->table_list, *info.update_fields, *info.update_values, @@ -3736,22 +3750,26 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) */ true, &map); - lex->select_lex.no_wrap_view_item= FALSE; + lex->first_select_lex()->no_wrap_view_item= FALSE; /* - When we are not using GROUP BY and there are no ungrouped aggregate functions - we can refer to other tables in the ON DUPLICATE KEY part. - We use next_name_resolution_table descructively, so check it first (views?) + When we are not using GROUP BY and there are no ungrouped + aggregate functions we can refer to other tables in the ON + DUPLICATE KEY part. We use next_name_resolution_table + descructively, so check it first (views?) */ DBUG_ASSERT (!table_list->next_name_resolution_table); - if (lex->select_lex.group_list.elements == 0 && - !lex->select_lex.with_sum_func) + if (lex->first_select_lex()->group_list.elements == 0 && + !lex->first_select_lex()->with_sum_func) + { /* - We must make a single context out of the two separate name resolution contexts : - the INSERT table and the tables in the SELECT part of INSERT ... SELECT. - To do that we must concatenate the two lists + We must make a single context out of the two separate name + resolution contexts : the INSERT table and the tables in the + SELECT part of INSERT ... SELECT. To do that we must + concatenate the two lists */ table_list->next_name_resolution_table= ctx_state.get_first_name_resolution_table(); + } res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values, MARK_COLUMNS_READ, 0, NULL, 0); @@ -3857,9 +3875,9 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) void DESCRIPTION - If the result table is the same as one of the source tables (INSERT SELECT), - the result table is not finally prepared at the join prepair phase. - Do the final preparation now. + If the result table is the same as one of the source tables + (INSERT SELECT), the result table is not finally prepared at the + join prepair phase. Do the final preparation now. RETURN 0 OK @@ -3999,10 +4017,13 @@ bool select_insert::prepare_eof() DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", trans_table, table->file->table_type())); - error= (IF_WSREP((thd->wsrep_conflict_state == MUST_ABORT || - thd->wsrep_conflict_state == CERT_FAILURE) ? -1 :, ) - (thd->locked_tables_mode <= LTM_LOCK_TABLES ? - table->file->ha_end_bulk_insert() : 0)); +#ifdef WITH_WSREP + error= (thd->wsrep_cs().current_error()) ? -1 : + (thd->locked_tables_mode <= LTM_LOCK_TABLES) ? +#else + error= (thd->locked_tables_mode <= LTM_LOCK_TABLES) ? +#endif /* WITH_WSREP */ + table->file->ha_end_bulk_insert() : 0; if (likely(!error) && unlikely(thd->is_error())) error= thd->get_stmt_da()->sql_errno(); @@ -4187,9 +4208,9 @@ void select_insert::abort_result_set() Field *Item::create_field_for_create_select(TABLE *table) { - Field *def_field, *tmp_field; - return ::create_tmp_field(table->in_use, table, this, type(), - (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0); + static Tmp_field_param param(false, false, false, false); + Tmp_field_src src; + return create_tmp_field_ex(table, &src, ¶m); } @@ -4232,10 +4253,8 @@ Field *Item::create_field_for_create_select(TABLE *table) @retval 0 Error */ -TABLE *select_create::create_table_from_items(THD *thd, - List<Item> *items, - MYSQL_LOCK **lock, - TABLEOP_HOOKS *hooks) +TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, + MYSQL_LOCK **lock, TABLEOP_HOOKS *hooks) { TABLE tmp_table; // Used during 'Create_field()' TABLE_SHARE share; @@ -4258,7 +4277,7 @@ TABLE *select_create::create_table_from_items(THD *thd, if (!opt_explicit_defaults_for_timestamp) promote_first_timestamp_column(&alter_info->create_list); - if (create_info->vers_fix_system_fields(thd, alter_info, *create_table)) + if (create_info->fix_create_fields(thd, alter_info, *create_table)) DBUG_RETURN(NULL); while ((item=it++)) @@ -4297,10 +4316,10 @@ TABLE *select_create::create_table_from_items(THD *thd, alter_info->create_list.push_back(cr_field, thd->mem_root); } - if (create_info->vers_check_system_fields(thd, alter_info, - create_table->table_name, - create_table->db, - select_field_count)) + if (create_info->check_fields(thd, alter_info, + create_table->table_name, + create_table->db, + select_field_count)) DBUG_RETURN(NULL); DEBUG_SYNC(thd,"create_table_select_before_create"); @@ -4521,8 +4540,6 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u) thd->binlog_start_trans_and_stmt(); } - DEBUG_SYNC(thd,"create_table_select_before_check_if_exists"); - if (!(table= create_table_from_items(thd, &values, &extra_lock, hook_ptr))) /* abort() deletes table */ DBUG_RETURN(-1); @@ -4640,9 +4657,16 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) /* suppress_use */ FALSE, errcode) > 0; } - - ha_fake_trx_id(thd); - +#ifdef WITH_WSREP + if (thd->wsrep_trx().active()) + { + WSREP_DEBUG("transaction already started for CTAS"); + } + else + { + wsrep_start_transaction(thd, thd->wsrep_next_trx_id()); + } +#endif return result; } @@ -4700,10 +4724,19 @@ bool select_create::send_eof() if (!table->s->tmp_table) { #ifdef WITH_WSREP - if (WSREP_ON) + if (WSREP(thd) && + table->file->ht->db_type == DB_TYPE_INNODB) { + if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID) + { + wsrep_start_transaction(thd, thd->wsrep_next_trx_id()); + } + DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID); + WSREP_DEBUG("CTAS key append for trx: %" PRIu64 " thd %llu query %lld ", + thd->wsrep_trx_id(), thd->thread_id, thd->query_id); + /* - append table level exclusive key for CTAS + append table level exclusive key for CTAS */ wsrep_key_arr_t key_arr= {0, 0}; wsrep_prepare_keys_for_isolation(thd, @@ -4711,15 +4744,11 @@ bool select_create::send_eof() create_table->table_name.str, table_list, &key_arr); - int rcode = wsrep->append_key( - wsrep, - &thd->wsrep_ws_handle, - key_arr.keys, //&wkey, - key_arr.keys_len, - WSREP_KEY_EXCLUSIVE, - false); + int rcode= wsrep_thd_append_key(thd, key_arr.keys, key_arr.keys_len, + WSREP_SERVICE_KEY_EXCLUSIVE); wsrep_keys_free(&key_arr); - if (rcode) { + if (rcode) + { DBUG_PRINT("wsrep", ("row key failed: %d", rcode)); WSREP_ERROR("Appending table key for CTAS failed: %s, %d", (wsrep_thd_query(thd)) ? @@ -4728,22 +4757,22 @@ bool select_create::send_eof() DBUG_RETURN(true); } /* If commit fails, we should be able to reset the OK status. */ - thd->get_stmt_da()->set_overwrite_status(TRUE); + thd->get_stmt_da()->set_overwrite_status(true); } #endif /* WITH_WSREP */ trans_commit_stmt(thd); if (!(thd->variables.option_bits & OPTION_GTID_BEGIN)) trans_commit_implicit(thd); #ifdef WITH_WSREP - if (WSREP_ON) + if (WSREP(thd)) { thd->get_stmt_da()->set_overwrite_status(FALSE); mysql_mutex_lock(&thd->LOCK_thd_data); - if (thd->wsrep_conflict_state != NO_CONFLICT) + if (wsrep_current_error(thd)) { - WSREP_DEBUG("select_create commit failed, thd: %lld err: %d %s", - (longlong) thd->thread_id, thd->wsrep_conflict_state, - thd->query()); + WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s", + thd->thread_id, + wsrep_thd_transaction_state_str(thd), WSREP_QUERY(thd)); mysql_mutex_unlock(&thd->LOCK_thd_data); abort_result_set(); DBUG_RETURN(true); |
