diff options
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 705 |
1 files changed, 466 insertions, 239 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 4bf15bfbb9b..c4d0a871ba5 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -67,7 +67,6 @@ #include "sp_head.h" #include "sql_view.h" // check_key_in_view, insert_view_fields #include "sql_table.h" // mysql_create_table_no_lock -#include "sql_acl.h" // *_ACL, check_grant_all_columns #include "sql_trigger.h" #include "sql_select.h" #include "sql_show.h" @@ -96,6 +95,8 @@ pthread_handler_t handle_delayed_insert(void *arg); static void unlink_blobs(TABLE *table); #endif static bool check_view_insertability(THD *thd, TABLE_LIST *view); +static int binlog_show_create_table(THD *thd, TABLE *table, + Table_specification_st *create_info); /* Check that insert/update fields are from the same single table of a view. @@ -554,8 +555,8 @@ bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list) if (thd->has_read_only_protection()) DBUG_RETURN(TRUE); - protection_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_DML, - MDL_STATEMENT); + MDL_REQUEST_INIT(&protection_request, MDL_key::BACKUP, "", "", + MDL_BACKUP_DML, MDL_STATEMENT); if (thd->mdl_context.acquire_lock(&protection_request, thd->variables.lock_wait_timeout)) @@ -657,24 +658,12 @@ static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list) thd->lex->explain->add_insert_plan(explain); - /* See Update_plan::updating_a_view for details */ - bool skip= MY_TEST(table_list->view); - /* Save subquery children */ for (SELECT_LEX_UNIT *unit= thd->lex->first_select_lex()->first_inner_unit(); unit; unit= unit->next_unit()) { - if (skip) - { - skip= false; - continue; - } - /* - Table elimination doesn't work for INSERTS, but let's still have this - here for consistency - */ - if (!(unit->item && unit->item->eliminated)) + if (unit->explainable()) explain->add_child(unit->first_select()->select_number); } } @@ -689,18 +678,19 @@ Field **TABLE::field_to_fill() /** INSERT statement implementation + SYNOPSIS + mysql_insert() + result NULL if the insert is not outputing results + via 'RETURNING' clause. + @note Like implementations of other DDL/DML in MySQL, this function relies on the caller to close the thread tables. This is done in the end of dispatch_command(). */ - -bool mysql_insert(THD *thd,TABLE_LIST *table_list, - List<Item> &fields, - List<List_item> &values_list, - List<Item> &update_fields, - List<Item> &update_values, - enum_duplicates duplic, - bool ignore) +bool mysql_insert(THD *thd, TABLE_LIST *table_list, + List<Item> &fields, List<List_item> &values_list, + List<Item> &update_fields, List<Item> &update_values, + enum_duplicates duplic, bool ignore, select_result *result) { bool retval= true; int error, res; @@ -719,6 +709,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, List_item *values; Name_resolution_context *context; Name_resolution_context_state ctx_state; + SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0; + unsigned char *readbuff= NULL; + #ifndef EMBEDDED_LIBRARY char *query= thd->query(); /* @@ -732,6 +725,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, Item *unused_conds= 0; DBUG_ENTER("mysql_insert"); + bzero((char*) &info,sizeof(info)); create_explain_query(thd->lex, thd->mem_root); /* Upgrade lock type if the requested lock is incompatible with @@ -744,8 +738,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, this will lead to a deadlock, since the delayed thread will never be able to get a lock on the table. */ - if (table_list->lock_type == TL_WRITE_DELAYED && - thd->locked_tables_mode && + if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables_mode && find_locked_table(thd->open_tables, table_list->db.str, table_list->table_name.str)) { @@ -773,14 +766,47 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, DBUG_RETURN(TRUE); value_count= values->elements; - if (mysql_prepare_insert(thd, table_list, table, fields, values, - update_fields, update_values, duplic, &unused_conds, - FALSE)) + if ((res= mysql_prepare_insert(thd, table_list, fields, values, + update_fields, update_values, duplic, + &unused_conds, FALSE))) + { + retval= thd->is_error(); + if (res < 0) + { + /* + Insert should be ignored but we have to log the query in statement + format in the binary log + */ + if (thd->binlog_current_query_unfiltered()) + retval= 1; + } goto abort; - + } /* mysql_prepare_insert sets table_list->table if it was not set */ table= table_list->table; + /* Prepares LEX::returing_list if it is not empty */ + if (returning) + { + result->prepare(returning->item_list, NULL); + if (thd->is_bulk_op()) + { + /* + It is RETURNING which needs network buffer to write result set and + it is array binfing which need network buffer to read parameters. + So we allocate yet another network buffer. + The old buffer will be freed at the end of operation. + */ + DBUG_ASSERT(thd->protocol == &thd->protocol_binary); + readbuff= thd->net.buff; // old buffer + if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC))) + { + readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer + goto abort; + } + } + } + context= &thd->lex->first_select_lex()->context; /* These three asserts test the hypothesis that the resetting of the name @@ -834,7 +860,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, /* Fill in the given fields and dump it to the table file */ - bzero((char*) &info,sizeof(info)); info.ignore= ignore; info.handle_duplicates=duplic; info.update_fields= &update_fields; @@ -884,13 +909,18 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, if (lock_type != TL_WRITE_DELAYED) #endif /* EMBEDDED_LIBRARY */ { + bool create_lookup_handler= duplic != DUP_ERROR; if (duplic != DUP_ERROR || ignore) { + create_lookup_handler= true; table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - if (table->file->ha_table_flags() & HA_DUPLICATE_POS && - table->file->ha_rnd_init_with_error(0)) - goto abort; + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + { + if (table->file->ha_rnd_init_with_error(0)) + goto abort; + } } + table->file->prepare_for_insert(create_lookup_handler); /** This is a simple check for the case when the table has a trigger that reads from it, or when the statement invokes a stored function @@ -949,8 +979,19 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, goto values_loop_end; } } + /* + If statement returns result set, we need to send the result set metadata + to the client so that it knows that it has to expect an EOF or ERROR. + At this point we have all the required information to send the result set + metadata. + */ + if (returning && + result->send_result_set_metadata(returning->item_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) + goto values_loop_end; THD_STAGE_INFO(thd, stage_update); + thd->decide_logging_format_low(table); do { DBUG_PRINT("info", ("iteration %llu", iteration)); @@ -1063,7 +1104,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, break; } - thd->decide_logging_format_low(table); #ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { @@ -1075,7 +1115,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, } else #endif - error=write_record(thd, table ,&info); + error= write_record(thd, table, &info, result); if (unlikely(error)) break; thd->get_stmt_da()->inc_current_row_for_warning(); @@ -1089,7 +1129,7 @@ values_loop_end: joins_freed= TRUE; /* - Now all rows are inserted. Time to update logs and sends response to + Now all rows are inserted. Time to update logs and sends response to user */ #ifndef EMBEDDED_LIBRARY @@ -1134,7 +1174,7 @@ values_loop_end: table->file->ha_rnd_end(); } - transactional_table= table->file->has_transactions(); + transactional_table= table->file->has_transactions_and_rollback(); if (likely(changed= (info.copied || info.deleted || info.updated))) { @@ -1146,13 +1186,13 @@ values_loop_end: query_cache_invalidate3(thd, table_list, 1); } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; - thd->transaction.all.m_unsafe_rollback_flags|= - (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT); + if (thd->transaction->stmt.modified_non_trans_table) + thd->transaction->all.modified_non_trans_table= TRUE; + thd->transaction->all.m_unsafe_rollback_flags|= + (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT); if (error <= 0 || - thd->transaction.stmt.modified_non_trans_table || + thd->transaction->stmt.modified_non_trans_table || was_insert_delayed) { if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) @@ -1190,7 +1230,7 @@ values_loop_end: such case the flag is ignored for constructing binlog event. */ DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0); - if (was_insert_delayed && table_list->lock_type == TL_WRITE) + if (was_insert_delayed && table_list->lock_type == TL_WRITE) { /* Binlog INSERT DELAYED as INSERT without DELAYED. */ String log_query; @@ -1215,7 +1255,7 @@ values_loop_end: } } DBUG_ASSERT(transactional_table || !changed || - thd->transaction.stmt.modified_non_trans_table); + thd->transaction->stmt.modified_non_trans_table); } THD_STAGE_INFO(thd, stage_end); /* @@ -1245,7 +1285,7 @@ values_loop_end: goto abort; if (thd->lex->analyze_stmt) { - retval= thd->lex->explain->send_explain(thd); + retval= 0; goto abort; } DBUG_PRINT("info", ("touched: %llu copied: %llu updated: %llu deleted: %llu", @@ -1255,26 +1295,39 @@ values_loop_end: if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields)) { - my_ok(thd, info.copied + info.deleted + + /* + Client expects an EOF/OK packet if result set metadata was sent. If + LEX::has_returning and the statement returns result set + we send EOF which is the indicator of the end of the row stream. + Oherwise we send an OK packet i.e when the statement returns only the + status information + */ + if (returning) + result->send_eof(); + else + my_ok(thd, info.copied + info.deleted + ((thd->client_capabilities & CLIENT_FOUND_ROWS) ? - info.touched : info.updated), - id); + info.touched : info.updated), id); } else { char buff[160]; ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ? info.touched : info.updated); + if (ignore) sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records, - (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 : - (ulong) (info.records - info.copied), + (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 : + (ulong) (info.records - info.copied), (long) thd->get_stmt_da()->current_statement_warn_count()); else sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records, - (ulong) (info.deleted + updated), + (ulong) (info.deleted + updated), (long) thd->get_stmt_da()->current_statement_warn_count()); - ::my_ok(thd, info.copied + info.deleted + updated, id, buff); + if (returning) + result->send_eof(); + else + ::my_ok(thd, info.copied + info.deleted + updated, id, buff); } thd->abort_on_warning= 0; if (thd->lex->current_select->first_cond_optimization) @@ -1282,7 +1335,8 @@ values_loop_end: thd->lex->current_select->save_leaf_tables(thd); thd->lex->current_select->first_cond_optimization= 0; } - + if (readbuff) + my_free(readbuff); DBUG_RETURN(FALSE); abort: @@ -1307,6 +1361,8 @@ abort: if (!joins_freed) free_underlaid_joins(thd, thd->lex->first_select_lex()); thd->abort_on_warning= 0; + if (readbuff) + my_free(readbuff); DBUG_RETURN(retval); } @@ -1400,6 +1456,33 @@ static bool check_view_insertability(THD * thd, TABLE_LIST *view) } +/** + TODO remove when MDEV-17395 will be closed + + Checks if REPLACE or ON DUPLICATE UPDATE was executed on table containing + WITHOUT OVERLAPS key. + + @return + 0 if no error + ER_NOT_SUPPORTED_YET if the above condidion was met + */ +int check_duplic_insert_without_overlaps(THD *thd, TABLE *table, + enum_duplicates duplic) +{ + if (duplic == DUP_REPLACE || duplic == DUP_UPDATE) + { + for (uint k = 0; k < table->s->keys; k++) + { + if (table->key_info[k].without_overlaps) + { + my_error(ER_NOT_SUPPORTED_YET, MYF(0), "WITHOUT OVERLAPS"); + return ER_NOT_SUPPORTED_YET; + } + } + } + return 0; +} + /* Check if table can be updated @@ -1494,12 +1577,10 @@ static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables) SYNOPSIS mysql_prepare_insert() - thd Thread handler - table_list Global/local table list - table Table to insert into (can be NULL if table should - be taken from table_list->table) - where Where clause (for insert ... select) - select_insert TRUE if INSERT ... SELECT statement + thd Thread handler + table_list Global/local table list + where Where clause (for insert ... select) + select_insert TRUE if INSERT ... SELECT statement TODO (in far future) In cases of: @@ -1510,17 +1591,18 @@ static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables) WARNING You MUST set table->insert_values to 0 after calling this function before releasing the table object. - + RETURN VALUE - FALSE OK - TRUE error + 0 OK + >0 error + <0 insert should be ignored */ -bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, - TABLE *table, List<Item> &fields, List_item *values, - List<Item> &update_fields, List<Item> &update_values, - enum_duplicates duplic, COND **where, - bool select_insert) +int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, + List<Item> &fields, List_item *values, + List<Item> &update_fields, List<Item> &update_values, + enum_duplicates duplic, COND **where, + bool select_insert) { SELECT_LEX *select_lex= thd->lex->first_select_lex(); Name_resolution_context *context= &select_lex->context; @@ -1528,48 +1610,34 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, bool insert_into_view= (table_list->view != 0); bool res= 0; table_map map= 0; + TABLE *table; DBUG_ENTER("mysql_prepare_insert"); - DBUG_PRINT("enter", ("table_list: %p table: %p view: %d", - table_list, table, - (int)insert_into_view)); + DBUG_PRINT("enter", ("table_list: %p view: %d", + table_list, (int) insert_into_view)); /* INSERT should have a SELECT or VALUES clause */ DBUG_ASSERT (!select_insert || !values); if (mysql_handle_derived(thd->lex, DT_INIT)) - DBUG_RETURN(TRUE); + DBUG_RETURN(1); if (table_list->handle_derived(thd->lex, DT_MERGE_FOR_INSERT)) - DBUG_RETURN(TRUE); + DBUG_RETURN(1); if (thd->lex->handle_list_of_derived(table_list, DT_PREPARE)) - DBUG_RETURN(TRUE); - /* - For subqueries in VALUES() we should not see the table in which we are - inserting (for INSERT ... SELECT this is done by changing table_list, - because INSERT ... SELECT share SELECT_LEX it with SELECT. - */ - if (!select_insert) - { - for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit(); - un; - un= un->next_unit()) - { - for (SELECT_LEX *sl= un->first_select(); - sl; - sl= sl->next_select()) - { - sl->context.outer_context= 0; - } - } - } + DBUG_RETURN(1); if (duplic == DUP_UPDATE) { /* it should be allocated before Item::fix_fields() */ if (table_list->set_insert_values(thd->mem_root)) - DBUG_RETURN(TRUE); + DBUG_RETURN(1); } + table= table_list->table; + + if (table->file->check_if_updates_are_ignored("INSERT")) + DBUG_RETURN(-1); + if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert)) - DBUG_RETURN(TRUE); + DBUG_RETURN(1); /* Prepare the fields in the statement. */ if (values) @@ -1587,10 +1655,11 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, table_list->next_local= 0; context->resolve_in_table_list_only(table_list); - res= (setup_fields(thd, Ref_ptr_array(), - *values, MARK_COLUMNS_READ, 0, NULL, 0) || + res= setup_returning_fields(thd, table_list) || + setup_fields(thd, Ref_ptr_array(), + *values, MARK_COLUMNS_READ, 0, NULL, 0) || check_insert_fields(thd, context->table_list, fields, *values, - !insert_into_view, 0, &map)); + !insert_into_view, 0, &map); if (!res) res= setup_fields(thd, Ref_ptr_array(), @@ -1611,14 +1680,14 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, if (res) DBUG_RETURN(res); - if (!table) - table= table_list->table; + if (check_duplic_insert_without_overlaps(thd, table, duplic) != 0) + DBUG_RETURN(true); if (table->versioned(VERS_TIMESTAMP) && duplic == DUP_REPLACE) { // Additional memory may be required to create historical items. if (table_list->set_insert_values(thd->mem_root)) - DBUG_RETURN(TRUE); + DBUG_RETURN(1); } if (!select_insert) @@ -1629,7 +1698,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, CHECK_DUP_ALLOW_DIFFERENT_ALIAS))) { update_non_unique_table_error(table_list, "INSERT", duplicate); - DBUG_RETURN(TRUE); + DBUG_RETURN(1); } select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds); } @@ -1639,7 +1708,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, */ if (duplic == DUP_UPDATE || duplic == DUP_REPLACE) prepare_for_positional_update(table, table_list); - DBUG_RETURN(FALSE); + DBUG_RETURN(0); } @@ -1711,6 +1780,7 @@ int vers_insert_history_row(TABLE *table) info - COPY_INFO structure describing handling of duplicates and which is used for counting number of records inserted and deleted. + sink - result sink for the RETURNING clause NOTE Once this record will be written to table after insert trigger will @@ -1718,8 +1788,8 @@ int vers_insert_history_row(TABLE *table) then both on update triggers will work instead. Similarly both on delete triggers will be invoked if we will delete conflicting records. - Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have - transactions. + Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which + is updated didn't have transactions. RETURN VALUE 0 - success @@ -1727,7 +1797,7 @@ int vers_insert_history_row(TABLE *table) */ -int write_record(THD *thd, TABLE *table,COPY_INFO *info) +int write_record(THD *thd, TABLE *table, COPY_INFO *info, select_result *sink) { int error, trg_error= 0; char *key=0; @@ -1738,7 +1808,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) DBUG_ENTER("write_record"); info->records++; - save_read_set= table->read_set; + save_read_set= table->read_set; save_write_set= table->write_set; if (info->handle_duplicates == DUP_REPLACE || @@ -1760,7 +1830,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) table->file->insert_id_for_cur_row= insert_id_for_cur_row; bool is_duplicate_key_error; if (table->file->is_fatal_error(error, HA_CHECK_ALL)) - goto err; + goto err; is_duplicate_key_error= table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP); if (!is_duplicate_key_error) @@ -1773,7 +1843,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) if (info->ignore) { table->file->print_error(error, MYF(ME_WARNING)); - goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */ + goto after_trg_or_ignored_err; /* Ignoring a not fatal error */ } goto err; } @@ -1878,7 +1948,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */ res= info->table_list->view_check_option(table->in_use, info->ignore); if (res == VIEW_CHECK_SKIP) - goto ok_or_after_trg_err; + goto after_trg_or_ignored_err; if (res == VIEW_CHECK_ERROR) goto before_trg_err; @@ -1896,7 +1966,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) if (!(thd->variables.old_behavior & OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) table->file->print_error(error, MYF(ME_WARNING)); - goto ok_or_after_trg_err; + goto after_trg_or_ignored_err; } goto err; } @@ -1915,7 +1985,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) table->file->print_error(error, MYF(0)); trg_error= 1; restore_record(table, record[2]); - goto ok_or_after_trg_err; + goto after_trg_or_ignored_err; } restore_record(table, record[2]); } @@ -1956,7 +2026,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) { table->file->restore_auto_increment(prev_insert_id_for_cur_row); } - goto ok_or_after_trg_err; + goto ok; } else /* DUP_REPLACE */ { @@ -1973,8 +2043,6 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) tables which have ON UPDATE but have no ON DELETE triggers, we just should not expose this fact to users by invoking ON UPDATE triggers. - For system versioning wa also use path through delete since we would - save nothing through this cheating. */ if (last_uniq_key(table,key_nr) && !table->file->referenced_by_foreign_key() && @@ -1994,7 +2062,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) { info->deleted++; if (!table->file->has_transactions()) - thd->transaction.stmt.modified_non_trans_table= TRUE; + thd->transaction->stmt.modified_non_trans_table= TRUE; if (table->versioned(VERS_TIMESTAMP)) { store_record(table, record[2]); @@ -2036,14 +2104,14 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) info->deleted++; else info->updated++; - if (!table->file->has_transactions()) - thd->transaction.stmt.modified_non_trans_table= TRUE; + if (!table->file->has_transactions_and_rollback()) + thd->transaction->stmt.modified_non_trans_table= TRUE; if (table->triggers && table->triggers->process_triggers(thd, TRG_EVENT_DELETE, TRG_ACTION_AFTER, TRUE)) { trg_error= 1; - goto ok_or_after_trg_err; + goto after_trg_or_ignored_err; } /* Let us attempt do write_row() once more */ } @@ -2079,7 +2147,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) table->file->print_error(error, MYF(ME_WARNING)); table->file->restore_auto_increment(prev_insert_id); - goto ok_or_after_trg_err; + goto after_trg_or_ignored_err; } after_trg_n_copied_inc: @@ -2089,11 +2157,21 @@ after_trg_n_copied_inc: table->triggers->process_triggers(thd, TRG_EVENT_INSERT, TRG_ACTION_AFTER, TRUE)); -ok_or_after_trg_err: +ok: + /* + We send the row after writing it to the table so that the + correct values are sent to the client. Otherwise it won't show + autoinc values (generated inside the handler::ha_write()) and + values updated in ON DUPLICATE KEY UPDATE. + */ + if (sink && sink->send_data(thd->lex->returning()->item_list) < 0) + trg_error= 1; + +after_trg_or_ignored_err: if (key) my_safe_afree(key,table->s->max_unique_length); - if (!table->file->has_transactions()) - thd->transaction.stmt.modified_non_trans_table= TRUE; + if (!table->file->has_transactions_and_rollback()) + thd->transaction->stmt.modified_non_trans_table= TRUE; DBUG_RETURN(trg_error); err: @@ -2408,7 +2486,8 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, di->thd.variables.binlog_annotate_row_events= 0; di->thd.set_db(&table_list->db); - di->thd.set_query(my_strndup(table_list->table_name.str, + di->thd.set_query(my_strndup(PSI_INSTRUMENT_ME, + table_list->table_name.str, table_list->table_name.length, MYF(MY_WME | ME_FATAL)), table_list->table_name.length, system_charset_info); @@ -2427,8 +2506,8 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, We need the tickets so that they can be cloned in handle_delayed_insert */ - di->grl_protection.init(MDL_key::BACKUP, "", "", - MDL_BACKUP_DML, MDL_STATEMENT); + MDL_REQUEST_INIT(&di->grl_protection, MDL_key::BACKUP, "", "", + MDL_BACKUP_DML, MDL_STATEMENT); di->grl_protection.ticket= grl_protection_request->ticket; init_mdl_requests(&di->table_list); di->table_list.mdl_request.ticket= table_list->mdl_request.ticket; @@ -2508,7 +2587,7 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, end_create: mysql_mutex_unlock(&LOCK_delayed_create); - DBUG_PRINT("exit", ("is_error: %d", thd->is_error())); + DBUG_PRINT("exit", ("is_error(): %d", thd->is_error())); DBUG_RETURN(thd->is_error()); } @@ -2544,6 +2623,8 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) uchar *bitmap; char *copy_tmp; uint bitmaps_used; + Field **default_fields, **virtual_fields; + uchar *record; DBUG_ENTER("Delayed_insert::get_local_table"); /* First request insert thread to get a lock */ @@ -2590,18 +2671,29 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) share= table->s; /* - Allocate memory for the TABLE object, the field pointers array, and - one record buffer of reclength size. Normally a table has three - record buffers of rec_buff_length size, which includes alignment - bytes. Since the table copy is used for creating one record only, - the other record buffers and alignment are unnecessary. + Allocate memory for the TABLE object, the field pointers array, + and one record buffer of reclength size. + Normally a table has three record buffers of rec_buff_length size, + which includes alignment bytes. Since the table copy is used for + creating one record only, the other record buffers and alignment + are unnecessary. + As the table will also need to calculate default values and + expresions, we have to allocate own version of fields. keys and key + parts. The key and key parts are needed as parse_vcol_defs() changes + them in case of long hash keys. */ THD_STAGE_INFO(client_thd, stage_allocating_local_table); - copy_tmp= (char*) client_thd->alloc(sizeof(*copy)+ - (share->fields+1)*sizeof(Field**)+ - share->reclength + - share->column_bitmap_size*4); - if (!copy_tmp) + if (!multi_alloc_root(client_thd->mem_root, + ©_tmp, sizeof(*table), + &field, (uint) (share->fields+1)*sizeof(Field**), + &default_fields, + (share->default_fields + + share->default_expressions + 1) * sizeof(Field*), + &virtual_fields, + (share->virtual_fields + 1) * sizeof(Field*), + &record, (uint) share->reclength, + &bitmap, (uint) share->column_bitmap_size*4, + NullS)) goto error; /* Copy the TABLE object. */ @@ -2610,27 +2702,14 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) /* We don't need to change the file handler here */ /* Assign the pointers for the field pointers array and the record. */ - field= copy->field= (Field**) (copy + 1); - bitmap= (uchar*) (field + share->fields + 1); - copy->record[0]= (bitmap + share->column_bitmap_size*4); + copy->field= field; + copy->record[0]= record; memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength); if (share->default_fields || share->default_expressions) - { - copy->default_field= (Field**) - client_thd->alloc((share->default_fields + - share->default_expressions + 1)* - sizeof(Field*)); - if (!copy->default_field) - goto error; - } - + copy->default_field= default_fields; if (share->virtual_fields) - { - copy->vfield= (Field **) client_thd->alloc((share->virtual_fields+1)* - sizeof(Field*)); - if (!copy->vfield) - goto error; - } + copy->vfield= virtual_fields; + copy->expr_arena= NULL; /* Ensure we don't use the table list of the original table */ @@ -2653,6 +2732,8 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) (*field)->invisible= (*org_field)->invisible; (*field)->orig_table= copy; // Remove connection (*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0] + (*field)->flags|= ((*org_field)->flags & LONG_UNIQUE_HASH_FIELD); + (*field)->invisible= (*org_field)->invisible; memdup_vcol(client_thd, (*field)->vcol_info); memdup_vcol(client_thd, (*field)->default_value); memdup_vcol(client_thd, (*field)->check_constraint); @@ -2661,6 +2742,9 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) } *field=0; + if (copy_keys_from_share(copy, client_thd->mem_root)) + goto error; + if (share->virtual_fields || share->default_expressions || share->default_fields) { @@ -2738,7 +2822,8 @@ int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic, if (query.str) { char *str; - if (!(str= my_strndup(query.str, query.length, MYF(MY_WME)))) + if (!(str= my_strndup(PSI_INSTRUMENT_ME, query.str, query.length, + MYF(MY_WME)))) goto err; query.str= str; } @@ -2761,7 +2846,8 @@ int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic, ip_length= strlen(thd->security_ctx->ip) + 1; } /* This can't be THREAD_SPECIFIC as it's freed in delayed thread */ - if (!(row->record= (char*) my_malloc(table->s->reclength + + if (!(row->record= (char*) my_malloc(PSI_INSTRUMENT_ME, + table->s->reclength + user_len + host_len + ip_length, MYF(MY_WME)))) goto err; @@ -2990,6 +3076,8 @@ bool Delayed_insert::open_and_lock_table() return TRUE; } table->copy_blobs= 1; + + table->file->prepare_for_row_logging(); return FALSE; } @@ -3033,15 +3121,16 @@ pthread_handler_t handle_delayed_insert(void *arg) { DBUG_ENTER("handle_delayed_insert"); thd->thread_stack= (char*) &thd; - if (init_thr_lock() || thd->store_globals()) + if (init_thr_lock()) { - /* Can't use my_error since store_globals has perhaps failed */ thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES); di->handler_thread_initialized= TRUE; thd->fatal_error(); goto err; } + thd->store_globals(); + thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock() /* @@ -3049,6 +3138,8 @@ pthread_handler_t handle_delayed_insert(void *arg) at which rows are inserted cannot be determined in mixed mode. */ thd->set_current_stmt_binlog_format_row_if_mixed(); + /* Don't annotate insert delayed binlog events */ + thd->variables.binlog_annotate_row_events= 0; /* Clone tickets representing protection against GRL and the lock on @@ -3247,8 +3338,19 @@ pthread_handler_t handle_delayed_insert(void *arg) trans_commit_stmt(thd); di->group_count=0; mysql_audit_release(thd); + /* + Reset binlog. We can't call ha_reset() for the table as this will + reset the table maps we have calculated earlier. + */ mysql_mutex_lock(&di->mutex); } + + /* + Reset binlog. We can't call ha_reset() for the table as this will + reset the table maps we have calculated earlier. + */ + thd->reset_binlog_for_next_statement(); + if (di->tables_in_use) mysql_cond_broadcast(&di->cond_client); // If waiting clients } @@ -3340,9 +3442,7 @@ bool Delayed_insert::handle_inserts(void) { int error; ulong max_rows; - bool has_trans = TRUE; - bool using_ignore= 0, using_opt_replace= 0, - using_bin_log= mysql_bin_log.is_open(); + bool using_ignore= 0, using_opt_replace= 0, using_bin_log; delayed_row *row; DBUG_ENTER("handle_inserts"); @@ -3376,6 +3476,13 @@ bool Delayed_insert::handle_inserts(void) if (table->file->ha_rnd_init_with_error(0)) goto err; + /* + We have to call prepare_for_row_logging() as the second call to + handler_writes() will not have called decide_logging_format. + */ + table->file->prepare_for_row_logging(); + table->file->prepare_for_insert(1); + using_bin_log= table->file->row_logging; /* We can't use row caching when using the binary log because if @@ -3384,6 +3491,7 @@ bool Delayed_insert::handle_inserts(void) */ if (!using_bin_log) table->file->extra(HA_EXTRA_WRITE_CACHE); + mysql_mutex_lock(&mutex); while ((row=rows.get())) @@ -3412,8 +3520,8 @@ bool Delayed_insert::handle_inserts(void) Guaranteed that the INSERT DELAYED STMT will not be here in SBR when mysql binlog is enabled. */ - DBUG_ASSERT(!(mysql_bin_log.is_open() && - !thd.is_current_stmt_binlog_format_row())); + DBUG_ASSERT(!mysql_bin_log.is_open() || + thd.is_current_stmt_binlog_format_row()); /* This is the first value of an INSERT statement. @@ -3476,7 +3584,7 @@ bool Delayed_insert::handle_inserts(void) VCOL_UPDATE_FOR_WRITE); } - if (unlikely(tmp_error) || unlikely(write_record(&thd, table, &info))) + if (unlikely(tmp_error || write_record(&thd, table, &info, NULL))) { info.error_count++; // Ignore errors thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status); @@ -3571,10 +3679,9 @@ bool Delayed_insert::handle_inserts(void) TODO: Move the logging to last in the sequence of rows. */ - has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE || - table->file->has_transactions(); - if (thd.is_current_stmt_binlog_format_row() && - thd.binlog_flush_pending_rows_event(TRUE, has_trans)) + if (table->file->row_logging && + thd.binlog_flush_pending_rows_event(TRUE, + table->file->row_logging_has_trans)) goto err; if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE)))) @@ -3628,27 +3735,35 @@ bool Delayed_insert::handle_inserts(void) thd thread handler RETURN - FALSE OK - TRUE Error + 0 OK + > 0 Error + < 0 Ok, ignore insert */ -bool mysql_insert_select_prepare(THD *thd) +int mysql_insert_select_prepare(THD *thd, select_result *sel_res) { + int res; LEX *lex= thd->lex; SELECT_LEX *select_lex= lex->first_select_lex(); DBUG_ENTER("mysql_insert_select_prepare"); - /* SELECT_LEX do not belong to INSERT statement, so we can't add WHERE clause if table is VIEW */ - - if (mysql_prepare_insert(thd, lex->query_tables, - lex->query_tables->table, lex->field_list, 0, - lex->update_list, lex->value_list, lex->duplicates, - &select_lex->where, TRUE)) - DBUG_RETURN(TRUE); + + if ((res= mysql_prepare_insert(thd, lex->query_tables, lex->field_list, 0, + lex->update_list, lex->value_list, + lex->duplicates, + &select_lex->where, TRUE))) + DBUG_RETURN(res); + + /* + If sel_res is not empty, it means we have items in returing_list. + So we prepare the list now + */ + if (sel_res) + sel_res->prepare(lex->returning()->item_list, NULL); DBUG_ASSERT(select_lex->leaf_tables.elements != 0); List_iterator<TABLE_LIST> ti(select_lex->leaf_tables); @@ -3682,7 +3797,7 @@ bool mysql_insert_select_prepare(THD *thd) while ((table= ti++) && insert_tables--) ti.remove(); - DBUG_RETURN(FALSE); + DBUG_RETURN(0); } @@ -3692,8 +3807,10 @@ select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par, List<Item> *update_fields, List<Item> *update_values, enum_duplicates duplic, - bool ignore_check_option_errors): + bool ignore_check_option_errors, + select_result *result): select_result_interceptor(thd_arg), + sel_result(result), table_list(table_list_par), table(table_par), fields(fields_par), autoinc_value_of_last_inserted_row(0), insert_into_view(table_list_par && table_list_par->view != 0) @@ -3712,7 +3829,7 @@ int select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) { LEX *lex= thd->lex; - int res; + int res= 0; table_map map= 0; SELECT_LEX *lex_current_select_save= lex->current_select; DBUG_ENTER("select_insert::prepare"); @@ -3726,17 +3843,18 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) */ lex->current_select= lex->first_select_lex(); - res= (setup_fields(thd, Ref_ptr_array(), - values, MARK_COLUMNS_READ, 0, NULL, 0) || + res= (setup_returning_fields(thd, table_list) || + setup_fields(thd, Ref_ptr_array(), values, MARK_COLUMNS_READ, 0, 0, + 0) || check_insert_fields(thd, table_list, *fields, values, !insert_into_view, 1, &map)); if (!res && fields->elements) { - bool saved_abort_on_warning= thd->abort_on_warning; - thd->abort_on_warning= !info.ignore && thd->is_strict_mode(); - res= check_that_all_fields_are_given_values(thd, table_list->table, table_list); - thd->abort_on_warning= saved_abort_on_warning; + Abort_on_warning_instant_set aws(thd, + !info.ignore && thd->is_strict_mode()); + res= check_that_all_fields_are_given_values(thd, table_list->table, + table_list); } if (info.handle_duplicates == DUP_UPDATE && !res) @@ -3853,13 +3971,18 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) #endif thd->cuted_fields=0; + bool create_lookup_handler= info.handle_duplicates != DUP_ERROR; if (info.ignore || info.handle_duplicates != DUP_ERROR) { + create_lookup_handler= true; table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - if (table->file->ha_table_flags() & HA_DUPLICATE_POS && - table->file->ha_rnd_init_with_error(0)) - DBUG_RETURN(1); + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + { + if (table->file->ha_rnd_init_with_error(0)) + DBUG_RETURN(1); + } } + table->file->prepare_for_insert(create_lookup_handler); if (info.handle_duplicates == DUP_REPLACE && (!table->triggers || !table->triggers->has_delete_triggers())) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); @@ -3890,7 +4013,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) If the result table is the same as one of the source tables (INSERT SELECT), the result table is not finally prepared at the join prepair phase. Do the final preparation now. - + RETURN 0 OK */ @@ -3898,11 +4021,18 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) int select_insert::prepare2(JOIN *) { DBUG_ENTER("select_insert::prepare2"); + if (table->validate_default_values_of_unset_fields(thd)) + DBUG_RETURN(1); + if (thd->lex->describe) + DBUG_RETURN(0); if (thd->lex->current_select->options & OPTION_BUFFER_RESULT && - thd->locked_tables_mode <= LTM_LOCK_TABLES && - !thd->lex->describe) + thd->locked_tables_mode <= LTM_LOCK_TABLES) table->file->ha_start_bulk_insert((ha_rows) 0); - if (table->validate_default_values_of_unset_fields(thd)) + + /* Same as the other variants of INSERT */ + if (sel_result && + sel_result->send_result_set_metadata(thd->lex->returning()->item_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); DBUG_RETURN(0); } @@ -3917,6 +4047,7 @@ void select_insert::cleanup() select_insert::~select_insert() { DBUG_ENTER("~select_insert"); + sel_result= NULL; if (table && table->is_created()) { table->next_number_field=0; @@ -3934,14 +4065,6 @@ int select_insert::send_data(List<Item> &values) DBUG_ENTER("select_insert::send_data"); bool error=0; - if (unit->offset_limit_cnt) - { // using limit offset,count - unit->offset_limit_cnt--; - DBUG_RETURN(0); - } - if (unlikely(thd->killed == ABORT_QUERY)) - DBUG_RETURN(0); - thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields store_values(values); if (table->default_field && @@ -3965,10 +4088,10 @@ int select_insert::send_data(List<Item> &values) } } - error= write_record(thd, table, &info); + error= write_record(thd, table, &info, sel_result); table->vers_write= table->versioned(); table->auto_increment_field_not_null= FALSE; - + if (likely(!error)) { if (table->triggers || info.handle_duplicates == DUP_UPDATE) @@ -4020,13 +4143,13 @@ void select_insert::store_values(List<Item> &values) bool select_insert::prepare_eof() { int error; - bool const trans_table= table->file->has_transactions(); + bool const trans_table= table->file->has_transactions_and_rollback(); bool changed; bool binary_logged= 0; killed_state killed_status= thd->killed; DBUG_ENTER("select_insert::prepare_eof"); - DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", + DBUG_PRINT("enter", ("trans_table: %d, table_type: '%s'", trans_table, table->file->table_type())); #ifdef WITH_WSREP @@ -4055,13 +4178,13 @@ bool select_insert::prepare_eof() query_cache_invalidate3(thd, table, 1); } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; - thd->transaction.all.m_unsafe_rollback_flags|= - (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT); + if (thd->transaction->stmt.modified_non_trans_table) + thd->transaction->all.modified_non_trans_table= TRUE; + thd->transaction->all.m_unsafe_rollback_flags|= + (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT); DBUG_ASSERT(trans_table || !changed || - thd->transaction.stmt.modified_non_trans_table); + thd->transaction->stmt.modified_non_trans_table); /* Write to binlog before commiting transaction. No statement will @@ -4070,7 +4193,7 @@ bool select_insert::prepare_eof() ha_autocommit_or_rollback() is issued below. */ if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && - (likely(!error) || thd->transaction.stmt.modified_non_trans_table)) + (likely(!error) || thd->transaction->stmt.modified_non_trans_table)) { int errcode= 0; int res; @@ -4104,7 +4227,6 @@ bool select_insert::send_ok_packet() { char message[160]; /* status message */ ulonglong row_count; /* rows affected */ ulonglong id; /* last insert-id */ - DBUG_ENTER("select_insert::send_ok_packet"); if (info.ignore) @@ -4126,7 +4248,14 @@ bool select_insert::send_ok_packet() { thd->first_successful_insert_id_in_prev_stmt : (info.copied ? autoinc_value_of_last_inserted_row : 0)); - ::my_ok(thd, row_count, id, message); + /* + Client expects an EOF/OK packet If LEX::has_returning and if result set + meta was sent. See explanation for other variants of INSERT. + */ + if (sel_result) + sel_result->send_eof(); + else + ::my_ok(thd, row_count, id, message); DBUG_RETURN(false); } @@ -4153,7 +4282,7 @@ void select_insert::abort_result_set() table will be assigned with view table structure, but that table will not be opened really (it is dummy to check fields types & Co). */ - if (table && table->file->get_table()) + if (table && table->file->is_open()) { bool changed, transactional_table; /* @@ -4183,12 +4312,12 @@ void select_insert::abort_result_set() zero, so no check for that is made. */ changed= (info.copied || info.deleted || info.updated); - transactional_table= table->file->has_transactions(); - if (thd->transaction.stmt.modified_non_trans_table || + transactional_table= table->file->has_transactions_and_rollback(); + if (thd->transaction->stmt.modified_non_trans_table || thd->log_current_statement) { if (!can_rollback_data()) - thd->transaction.all.modified_non_trans_table= TRUE; + thd->transaction->all.modified_non_trans_table= TRUE; if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) { @@ -4204,7 +4333,7 @@ void select_insert::abort_result_set() query_cache_invalidate3(thd, table, 1); } DBUG_ASSERT(transactional_table || !changed || - thd->transaction.stmt.modified_non_trans_table); + thd->transaction->stmt.modified_non_trans_table); table->s->table_creation_was_logged|= binary_logged; table->file->ha_release_auto_increment(); @@ -4218,11 +4347,11 @@ void select_insert::abort_result_set() CREATE TABLE (SELECT) ... ***************************************************************************/ -Field *Item::create_field_for_create_select(TABLE *table) +Field *Item::create_field_for_create_select(MEM_ROOT *root, TABLE *table) { static Tmp_field_param param(false, false, false, false); Tmp_field_src src; - return create_tmp_field_ex(table, &src, ¶m); + return create_tmp_field_ex(root, table, &src, ¶m); } @@ -4294,7 +4423,8 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, while ((item=it++)) { - Field *tmp_field= item->create_field_for_create_select(&tmp_table); + Field *tmp_field= item->create_field_for_create_select(thd->mem_root, + &tmp_table); if (!tmp_field) DBUG_RETURN(NULL); @@ -4464,6 +4594,18 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, /* purecov: end */ } table->s->table_creation_was_logged= save_table_creation_was_logged; + if (!table->s->tmp_table) + table->file->prepare_for_row_logging(); + + /* + If slave is converting a statement event to row events, log the original + create statement as an annotated row + */ +#ifdef HAVE_REPLICATION + if (thd->slave_thread && opt_replicate_annotate_row_events && + thd->is_current_stmt_binlog_format_row()) + thd->variables.binlog_annotate_row_events= 1; +#endif DBUG_RETURN(table); } @@ -4521,13 +4663,9 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u) return error; TABLE const *const table = *tables; - if (thd->is_current_stmt_binlog_format_row() && + if (thd->is_current_stmt_binlog_format_row() && !table->s->tmp_table) - { - int error; - if (unlikely((error= ptr->binlog_show_create_table(tables, count)))) - return error; - } + return binlog_show_create_table(thd, *tables, ptr->create_info); return 0; } select_create *ptr; @@ -4611,13 +4749,18 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u) restore_record(table,s->default_values); // Get empty record thd->cuted_fields=0; + bool create_lookup_handler= info.handle_duplicates != DUP_ERROR; if (info.ignore || info.handle_duplicates != DUP_ERROR) { + create_lookup_handler= true; table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - if (table->file->ha_table_flags() & HA_DUPLICATE_POS && - table->file->ha_rnd_init_with_error(0)) - DBUG_RETURN(1); + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + { + if (table->file->ha_rnd_init_with_error(0)) + DBUG_RETURN(1); + } } + table->file->prepare_for_insert(create_lookup_handler); if (info.handle_duplicates == DUP_REPLACE && (!table->triggers || !table->triggers->has_delete_triggers())) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); @@ -4635,8 +4778,9 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u) DBUG_RETURN(0); } -int -select_create::binlog_show_create_table(TABLE **tables, uint count) + +static int binlog_show_create_table(THD *thd, TABLE *table, + Table_specification_st *create_info) { /* Note 1: In RBR mode, we generate a CREATE TABLE statement for the @@ -4655,14 +4799,12 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) statement transaction cache. */ DBUG_ASSERT(thd->is_current_stmt_binlog_format_row()); - DBUG_ASSERT(tables && *tables && count > 0); - StringBuffer<2048> query(system_charset_info); int result; TABLE_LIST tmp_table_list; tmp_table_list.reset(); - tmp_table_list.table = *tables; + tmp_table_list.table = table; result= show_create_table(thd, &tmp_table_list, &query, create_info, WITH_DB_NAME); @@ -4691,6 +4833,88 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) return result; } + +/** + Log CREATE TABLE to binary log + + @param thd Thread handler + @param table Log create statement for this table + + This function is called from ALTER TABLE for a shared table converted + to a not shared table. +*/ + +bool binlog_create_table(THD *thd, TABLE *table, bool replace) +{ + Table_specification_st create_info; + bool result; + ulonglong save_option_bits; + + /* Don't log temporary tables in row format */ + if (thd->variables.binlog_format == BINLOG_FORMAT_ROW && + table->s->tmp_table) + return 0; + if (!thd->binlog_table_should_be_logged(&table->s->db)) + return 0; + + /* + We have to use ROW format to ensure that future row inserts will be + logged + */ + thd->set_current_stmt_binlog_format_row(); + table->file->prepare_for_row_logging(); + + create_info.lex_start(); + save_option_bits= thd->variables.option_bits; + if (replace) + create_info.set(DDL_options_st::OPT_OR_REPLACE); + /* Ensure we write ENGINE=xxx and CHARSET=... to binary log */ + create_info.used_fields|= (HA_CREATE_USED_ENGINE | + HA_CREATE_USED_DEFAULT_CHARSET); + /* Ensure we write all engine options to binary log */ + create_info.used_fields|= HA_CREATE_PRINT_ALL_OPTIONS; + result= binlog_show_create_table(thd, table, &create_info) != 0; + thd->variables.option_bits= save_option_bits; + return result; +} + + +/** + Log DROP TABLE to binary log + + @param thd Thread handler + @param table Log create statement for this table + + This function is called from ALTER TABLE for a shared table converted + to a not shared table. +*/ + +bool binlog_drop_table(THD *thd, TABLE *table) +{ + StringBuffer<2048> query(system_charset_info); + /* Don't log temporary tables in row format */ + if (!table->s->table_creation_was_logged) + return 0; + if (!thd->binlog_table_should_be_logged(&table->s->db)) + return 0; + + query.append("DROP "); + if (table->s->tmp_table) + query.append("TEMPORARY "); + query.append("TABLE IF EXISTS "); + append_identifier(thd, &query, &table->s->db); + query.append("."); + append_identifier(thd, &query, &table->s->table_name); + + return thd->binlog_query(THD::STMT_QUERY_TYPE, + query.ptr(), query.length(), + /* is_trans */ TRUE, + /* direct */ FALSE, + /* suppress_use */ TRUE, + 0) > 0; +} + + void select_create::store_values(List<Item> &values) { fill_record_n_invoke_before_triggers(thd, table, field, values, 1, @@ -4708,7 +4932,10 @@ bool select_create::send_eof() mark the flag at this point. */ if (table->s->tmp_table) - thd->transaction.stmt.mark_created_temp_table(); + thd->transaction->stmt.mark_created_temp_table(); + + if (thd->slave_thread) + thd->variables.binlog_annotate_row_events= 0; if (prepare_eof()) { @@ -4872,7 +5099,7 @@ void select_create::abort_result_set() save_option_bits= thd->variables.option_bits; thd->variables.option_bits&= ~OPTION_BIN_LOG; select_insert::abort_result_set(); - thd->transaction.stmt.modified_non_trans_table= FALSE; + thd->transaction->stmt.modified_non_trans_table= FALSE; thd->variables.option_bits= save_option_bits; /* possible error of writing binary log is ignored deliberately */ |