summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc327
1 files changed, 178 insertions, 149 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 9db6acf73f8..c969725bea4 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -82,6 +82,10 @@
#include "debug_sync.h"
+#ifdef WITH_WSREP
+#include "wsrep_trans_observer.h" /* wsrep_start_transction() */
+#endif /* WITH_WSREP */
+
#ifndef EMBEDDED_LIBRARY
static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
TABLE_LIST *table_list);
@@ -241,7 +245,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
}
else
{ // Part field list
- SELECT_LEX *select_lex= &thd->lex->select_lex;
+ SELECT_LEX *select_lex= thd->lex->first_select_lex();
Name_resolution_context *context= &select_lex->context;
Name_resolution_context_state ctx_state;
int res;
@@ -273,7 +277,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
/* Restore the current context. */
ctx_state.restore_state(context, table_list);
- thd->lex->select_lex.no_wrap_view_item= FALSE;
+ thd->lex->first_select_lex()->no_wrap_view_item= FALSE;
if (res)
DBUG_RETURN(-1);
@@ -547,10 +551,10 @@ bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
If this goes ok, the tickets are cloned and added to the list of granted
locks held by the handler thread.
*/
- if (thd->global_read_lock.can_acquire_protection())
+ if (thd->has_read_only_protection())
DBUG_RETURN(TRUE);
- protection_request.init(MDL_key::GLOBAL, "", "", MDL_INTENTION_EXCLUSIVE,
+ protection_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_DML,
MDL_STATEMENT);
if (thd->mdl_context.acquire_lock(&protection_request,
@@ -640,7 +644,7 @@ create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
if (buf->append(thd->query()) ||
buf->replace(thd->lex->keyword_delayed_begin_offset,
thd->lex->keyword_delayed_end_offset -
- thd->lex->keyword_delayed_begin_offset, 0))
+ thd->lex->keyword_delayed_begin_offset, NULL, 0))
return 1;
return 0;
}
@@ -657,7 +661,7 @@ static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
bool skip= MY_TEST(table_list->view);
/* Save subquery children */
- for (SELECT_LEX_UNIT *unit= thd->lex->select_lex.first_inner_unit();
+ for (SELECT_LEX_UNIT *unit= thd->lex->first_select_lex()->first_inner_unit();
unit;
unit= unit->next_unit())
{
@@ -777,7 +781,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
/* mysql_prepare_insert sets table_list->table if it was not set */
table= table_list->table;
- context= &thd->lex->select_lex.context;
+ context= &thd->lex->first_select_lex()->context;
/*
These three asserts test the hypothesis that the resetting of the name
resolution context below is not necessary at all since the list of local
@@ -900,6 +904,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
using_bulk_insert= 1;
table->file->ha_start_bulk_insert(values_list.elements);
}
+ else
+ table->file->ha_reset_copy_info();
}
thd->abort_on_warning= !ignore && thd->is_strict_mode();
@@ -1079,7 +1085,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
} while (bulk_parameters_iterations(thd));
values_loop_end:
- free_underlaid_joins(thd, &thd->lex->select_lex);
+ free_underlaid_joins(thd, thd->lex->first_select_lex());
joins_freed= TRUE;
/*
@@ -1103,11 +1109,23 @@ values_loop_end:
auto_inc values from the delayed_insert thread as they share TABLE.
*/
table->file->ha_release_auto_increment();
- if (using_bulk_insert && unlikely(table->file->ha_end_bulk_insert()) &&
- !error)
+ if (using_bulk_insert)
+ {
+ if (unlikely(table->file->ha_end_bulk_insert()) &&
+ !error)
+ {
+ table->file->print_error(my_errno,MYF(0));
+ error=1;
+ }
+ }
+ /* Get better status from handler if handler supports it */
+ if (table->file->copy_info.records)
{
- table->file->print_error(my_errno,MYF(0));
- error=1;
+ DBUG_ASSERT(info.copied >= table->file->copy_info.copied);
+ info.touched= table->file->copy_info.touched;
+ info.copied= table->file->copy_info.copied;
+ info.deleted= table->file->copy_info.deleted;
+ info.updated= table->file->copy_info.updated;
}
if (duplic != DUP_ERROR || ignore)
{
@@ -1230,8 +1248,12 @@ values_loop_end:
retval= thd->lex->explain->send_explain(thd);
goto abort;
}
- if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
- !thd->cuted_fields))
+ DBUG_PRINT("info", ("touched: %llu copied: %llu updated: %llu deleted: %llu",
+ (ulonglong) info.touched, (ulonglong) info.copied,
+ (ulonglong) info.updated, (ulonglong) info.deleted));
+
+ if ((iteration * values_list.elements) == 1 &&
+ (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
{
my_ok(thd, info.copied + info.deleted +
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
@@ -1272,7 +1294,7 @@ abort:
table->file->ha_release_auto_increment();
if (!joins_freed)
- free_underlaid_joins(thd, &thd->lex->select_lex);
+ free_underlaid_joins(thd, thd->lex->first_select_lex());
thd->abort_on_warning= 0;
DBUG_RETURN(retval);
}
@@ -1302,7 +1324,7 @@ abort:
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
{
- uint num= view->view->select_lex.item_list.elements;
+ uint num= view->view->first_select_lex()->item_list.elements;
TABLE *table= view->table;
Field_translator *trans_start= view->field_translation,
*trans_end= trans_start + num;
@@ -1402,10 +1424,12 @@ static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
than INSERT.
*/
- if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
- &thd->lex->select_lex.top_join_list,
+ if (setup_tables_and_check_access(thd,
+ &thd->lex->first_select_lex()->context,
+ &thd->lex->first_select_lex()->
+ top_join_list,
table_list,
- thd->lex->select_lex.leaf_tables,
+ thd->lex->first_select_lex()->leaf_tables,
select_insert, INSERT_ACL, SELECT_ACL,
TRUE))
DBUG_RETURN(TRUE);
@@ -1413,7 +1437,7 @@ static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
if (insert_into_view && !fields.elements)
{
thd->lex->empty_field_list_on_rset= 1;
- if (!thd->lex->select_lex.leaf_tables.head()->table ||
+ if (!thd->lex->first_select_lex()->leaf_tables.head()->table ||
table_list->is_multitable())
{
my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
@@ -1487,7 +1511,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
enum_duplicates duplic, COND **where,
bool select_insert)
{
- SELECT_LEX *select_lex= &thd->lex->select_lex;
+ SELECT_LEX *select_lex= thd->lex->first_select_lex();
Name_resolution_context *context= &select_lex->context;
Name_resolution_context_state ctx_state;
bool insert_into_view= (table_list->view != 0);
@@ -1737,7 +1761,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
*/
if (info->ignore)
{
- table->file->print_error(error, MYF(ME_JUST_WARNING));
+ table->file->print_error(error, MYF(ME_WARNING));
goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
}
goto err;
@@ -1756,10 +1780,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
was used. This ensures that we don't get a problem when the
whole range of the key has been used.
*/
- if (info->handle_duplicates == DUP_REPLACE &&
- table->next_number_field &&
- key_nr == table->s->next_number_index &&
- (insert_id_for_cur_row > 0))
+ if (info->handle_duplicates == DUP_REPLACE && table->next_number_field &&
+ key_nr == table->s->next_number_index && insert_id_for_cur_row > 0)
goto err;
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
{
@@ -1861,7 +1883,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
{
if (!(thd->variables.old_behavior &
OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
- table->file->print_error(error, MYF(ME_JUST_WARNING));
+ table->file->print_error(error, MYF(ME_WARNING));
goto ok_or_after_trg_err;
}
goto err;
@@ -2043,7 +2065,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
goto err;
if (!(thd->variables.old_behavior &
OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
- table->file->print_error(error, MYF(ME_JUST_WARNING));
+ table->file->print_error(error, MYF(ME_WARNING));
table->file->restore_auto_increment(prev_insert_id);
goto ok_or_after_trg_err;
}
@@ -2205,11 +2227,11 @@ public:
mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
- mysql_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_delayed_insert);
delayed_insert_threads++;
+ mysql_mutex_unlock(&LOCK_delayed_insert);
delayed_lock= global_system_variables.low_priority_updates ?
TL_WRITE_LOW_PRIORITY : TL_WRITE;
- mysql_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
}
~Delayed_insert()
@@ -2221,21 +2243,15 @@ public:
if (table)
{
close_thread_tables(&thd);
- thd.mdl_context.release_transactional_locks();
+ thd.mdl_context.release_transactional_locks(&thd);
}
mysql_mutex_destroy(&mutex);
mysql_cond_destroy(&cond);
mysql_cond_destroy(&cond_client);
- /*
- We could use unlink_not_visible_threads() here, but as
- delayed_insert_threads also needs to be protected by
- the LOCK_thread_count mutex, we open code this.
- */
- mysql_mutex_lock(&LOCK_thread_count);
- thd.unlink(); // Must be unlinked under lock
+ server_threads.erase(&thd);
+ mysql_mutex_assert_owner(&LOCK_delayed_insert);
delayed_insert_threads--;
- mysql_mutex_unlock(&LOCK_thread_count);
my_free(thd.query());
thd.security_ctx->user= 0;
@@ -2382,7 +2398,7 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
di->thd.set_db(&table_list->db);
di->thd.set_query(my_strndup(table_list->table_name.str,
table_list->table_name.length,
- MYF(MY_WME | ME_FATALERROR)),
+ MYF(MY_WME | ME_FATAL)),
table_list->table_name.length, system_charset_info);
if (di->thd.db.str == NULL || di->thd.query() == NULL)
{
@@ -2395,9 +2411,12 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
di->table_list.alias.str= di->table_list.table_name.str= di->thd.query();
di->table_list.alias.length= di->table_list.table_name.length= di->thd.query_length();
di->table_list.db= di->thd.db;
- /* We need the tickets so that they can be cloned in handle_delayed_insert */
- di->grl_protection.init(MDL_key::GLOBAL, "", "",
- MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT);
+ /*
+ We need the tickets so that they can be cloned in
+ handle_delayed_insert
+ */
+ di->grl_protection.init(MDL_key::BACKUP, "", "",
+ MDL_BACKUP_DML, MDL_STATEMENT);
di->grl_protection.ticket= grl_protection_request->ticket;
init_mdl_requests(&di->table_list);
di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
@@ -2414,7 +2433,7 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
mysql_mutex_unlock(&di->mutex);
di->unlock();
delete di;
- my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATALERROR), error);
+ my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATAL), error);
goto end_create;
}
@@ -2472,10 +2491,12 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
}
/* Unlock the delayed insert object after its last access. */
di->unlock();
- DBUG_RETURN((table_list->table == NULL));
+ DBUG_PRINT("exit", ("table_list->table: %p", table_list->table));
+ DBUG_RETURN(thd->is_error());
end_create:
mysql_mutex_unlock(&LOCK_delayed_create);
+ DBUG_PRINT("exit", ("is_error: %d", thd->is_error()));
DBUG_RETURN(thd->is_error());
}
@@ -2530,24 +2551,27 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
if (thd.killed)
{
/*
- Copy the error message. Note that we don't treat fatal
- errors in the delayed thread as fatal errors in the
- main thread. If delayed thread was killed, we don't
- want to send "Server shutdown in progress" in the
- INSERT THREAD.
-
- The thread could be killed with an error message if
- di->handle_inserts() or di->open_and_lock_table() fails.
- The thread could be killed without an error message if
- killed using THD::notify_shared_lock() or
- kill_delayed_threads_for_table().
+ Check how the insert thread was killed. If it was killed
+ by FLUSH TABLES which calls kill_delayed_threads_for_table(),
+ then is_error is not set.
+ In this case, return without setting an error,
+ which means that the insert will be converted to a normal insert.
*/
- if (!thd.is_error())
- my_message(ER_QUERY_INTERRUPTED, ER_THD(&thd, ER_QUERY_INTERRUPTED),
- MYF(0));
- else
+ if (thd.is_error())
+ {
+ /*
+ Copy the error message. Note that we don't treat fatal
+ errors in the delayed thread as fatal errors in the
+ main thread. If delayed thread was killed, we don't
+ want to send "Server shutdown in progress" in the
+ INSERT THREAD.
+
+ The thread could be killed with an error message if
+ di->handle_inserts() or di->open_and_lock_table() fails.
+ */
my_message(thd.get_stmt_da()->sql_errno(),
thd.get_stmt_da()->message(), MYF(0));
+ }
goto error;
}
}
@@ -2629,10 +2653,6 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
share->default_fields)
{
bool error_reported= FALSE;
- if (unlikely(!(copy->def_vcol_set=
- (MY_BITMAP*) alloc_root(client_thd->mem_root,
- sizeof(MY_BITMAP)))))
- goto error;
if (unlikely(parse_vcol_defs(client_thd, client_thd->mem_root, copy,
&error_reported,
VCOL_INIT_DEPENDENCY_FAILURE_IS_WARNING)))
@@ -2652,15 +2672,6 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
copy->def_write_set.bitmap= ((my_bitmap_map*)
(bitmap + share->column_bitmap_size));
bitmaps_used= 2;
- if (share->virtual_fields)
- {
- my_bitmap_init(copy->def_vcol_set,
- (my_bitmap_map*) (bitmap +
- bitmaps_used*share->column_bitmap_size),
- share->fields, FALSE);
- bitmaps_used++;
- copy->vcol_set= copy->def_vcol_set;
- }
if (share->default_fields || share->default_expressions)
{
my_bitmap_init(&copy->has_value_set,
@@ -2858,23 +2869,7 @@ void kill_delayed_threads(void)
mysql_mutex_lock(&di->thd.LOCK_thd_kill);
if (di->thd.killed < KILL_CONNECTION)
di->thd.set_killed_no_mutex(KILL_CONNECTION);
- if (di->thd.mysys_var)
- {
- mysql_mutex_lock(&di->thd.mysys_var->mutex);
- if (di->thd.mysys_var->current_cond)
- {
- /*
- We need the following test because the main mutex may be locked
- in handle_delayed_insert()
- */
- if (&di->mutex != di->thd.mysys_var->current_mutex)
- mysql_mutex_lock(di->thd.mysys_var->current_mutex);
- mysql_cond_broadcast(di->thd.mysys_var->current_cond);
- if (&di->mutex != di->thd.mysys_var->current_mutex)
- mysql_mutex_unlock(di->thd.mysys_var->current_mutex);
- }
- mysql_mutex_unlock(&di->thd.mysys_var->mutex);
- }
+ di->thd.abort_current_cond_wait(false);
mysql_mutex_unlock(&di->thd.LOCK_thd_kill);
}
mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
@@ -2999,7 +2994,7 @@ pthread_handler_t handle_delayed_insert(void *arg)
pthread_detach_this_thread();
/* Add thread to THD list so that's it's visible in 'show processlist' */
thd->set_start_time();
- add_to_active_threads(thd);
+ server_threads.insert(thd);
if (abort_loop)
thd->set_killed(KILL_CONNECTION);
else
@@ -3157,11 +3152,30 @@ pthread_handler_t handle_delayed_insert(void *arg)
mysql_mutex_unlock(&di->thd.mysys_var->mutex);
mysql_mutex_lock(&di->mutex);
}
+
+ /*
+ The code depends on that the following ASSERT always hold.
+ I don't want to accidently introduce and bugs in the following code
+ in this commit, so I leave the small cleaning up of the code to
+ a future commit
+ */
+ DBUG_ASSERT(thd->lock || di->stacked_inserts == 0);
+
DBUG_PRINT("delayed",
- ("thd->killed: %d di->tables_in_use: %d thd->lock: %d",
- thd->killed, di->tables_in_use, thd->lock != 0));
+ ("thd->killed: %d di->status: %d di->stacked_insert: %d di->tables_in_use: %d thd->lock: %d",
+ thd->killed, di->status, di->stacked_inserts, di->tables_in_use, thd->lock != 0));
- if (di->tables_in_use && ! thd->lock && !thd->killed)
+ /*
+ This is used to test see what happens if killed is sent before
+ we have time to handle the insert requests.
+ */
+ DBUG_EXECUTE_IF("write_delay_wakeup",
+ if (!thd->killed && di->stacked_inserts)
+ my_sleep(500000);
+ );
+
+ if (di->tables_in_use && ! thd->lock &&
+ (!thd->killed || di->stacked_inserts))
{
/*
Request for new delayed insert.
@@ -3335,7 +3349,7 @@ bool Delayed_insert::handle_inserts(void)
or if another thread is removing the current table definition
from the table cache.
*/
- my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATALERROR | ME_NOREFRESH),
+ my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATAL | ME_ERROR_LOG),
table->s->table_name.str);
goto err;
}
@@ -3511,7 +3525,7 @@ bool Delayed_insert::handle_inserts(void)
{
/* This is not known to happen. */
my_error(ER_DELAYED_CANT_CHANGE_LOCK,
- MYF(ME_FATALERROR | ME_NOREFRESH),
+ MYF(ME_FATAL | ME_ERROR_LOG),
table->s->table_name.str);
goto err;
}
@@ -3609,7 +3623,7 @@ bool Delayed_insert::handle_inserts(void)
bool mysql_insert_select_prepare(THD *thd)
{
LEX *lex= thd->lex;
- SELECT_LEX *select_lex= &lex->select_lex;
+ SELECT_LEX *select_lex= lex->first_select_lex();
DBUG_ENTER("mysql_insert_select_prepare");
@@ -3698,7 +3712,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
select, LEX::current_select should point to the first select while
we are fixing fields from insert list.
*/
- lex->current_select= &lex->select_lex;
+ lex->current_select= lex->first_select_lex();
res= (setup_fields(thd, Ref_ptr_array(),
values, MARK_COLUMNS_READ, 0, NULL, 0) ||
@@ -3715,7 +3729,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
if (info.handle_duplicates == DUP_UPDATE && !res)
{
- Name_resolution_context *context= &lex->select_lex.context;
+ Name_resolution_context *context= &lex->first_select_lex()->context;
Name_resolution_context_state ctx_state;
/* Save the state of the current name resolution context. */
@@ -3725,7 +3739,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
table_list->next_local= 0;
context->resolve_in_table_list_only(table_list);
- lex->select_lex.no_wrap_view_item= TRUE;
+ lex->first_select_lex()->no_wrap_view_item= TRUE;
res= res ||
check_update_fields(thd, context->table_list,
*info.update_fields, *info.update_values,
@@ -3736,22 +3750,26 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
*/
true,
&map);
- lex->select_lex.no_wrap_view_item= FALSE;
+ lex->first_select_lex()->no_wrap_view_item= FALSE;
/*
- When we are not using GROUP BY and there are no ungrouped aggregate functions
- we can refer to other tables in the ON DUPLICATE KEY part.
- We use next_name_resolution_table descructively, so check it first (views?)
+ When we are not using GROUP BY and there are no ungrouped
+ aggregate functions we can refer to other tables in the ON
+ DUPLICATE KEY part. We use next_name_resolution_table
+ descructively, so check it first (views?)
*/
DBUG_ASSERT (!table_list->next_name_resolution_table);
- if (lex->select_lex.group_list.elements == 0 &&
- !lex->select_lex.with_sum_func)
+ if (lex->first_select_lex()->group_list.elements == 0 &&
+ !lex->first_select_lex()->with_sum_func)
+ {
/*
- We must make a single context out of the two separate name resolution contexts :
- the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
- To do that we must concatenate the two lists
+ We must make a single context out of the two separate name
+ resolution contexts : the INSERT table and the tables in the
+ SELECT part of INSERT ... SELECT. To do that we must
+ concatenate the two lists
*/
table_list->next_name_resolution_table=
ctx_state.get_first_name_resolution_table();
+ }
res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values,
MARK_COLUMNS_READ, 0, NULL, 0);
@@ -3857,9 +3875,9 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
void
DESCRIPTION
- If the result table is the same as one of the source tables (INSERT SELECT),
- the result table is not finally prepared at the join prepair phase.
- Do the final preparation now.
+ If the result table is the same as one of the source tables
+ (INSERT SELECT), the result table is not finally prepared at the
+ join prepair phase. Do the final preparation now.
RETURN
0 OK
@@ -3999,10 +4017,13 @@ bool select_insert::prepare_eof()
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
trans_table, table->file->table_type()));
- error= (IF_WSREP((thd->wsrep_conflict_state == MUST_ABORT ||
- thd->wsrep_conflict_state == CERT_FAILURE) ? -1 :, )
- (thd->locked_tables_mode <= LTM_LOCK_TABLES ?
- table->file->ha_end_bulk_insert() : 0));
+#ifdef WITH_WSREP
+ error= (thd->wsrep_cs().current_error()) ? -1 :
+ (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
+#else
+ error= (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
+#endif /* WITH_WSREP */
+ table->file->ha_end_bulk_insert() : 0;
if (likely(!error) && unlikely(thd->is_error()))
error= thd->get_stmt_da()->sql_errno();
@@ -4187,9 +4208,9 @@ void select_insert::abort_result_set()
Field *Item::create_field_for_create_select(TABLE *table)
{
- Field *def_field, *tmp_field;
- return ::create_tmp_field(table->in_use, table, this, type(),
- (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0);
+ static Tmp_field_param param(false, false, false, false);
+ Tmp_field_src src;
+ return create_tmp_field_ex(table, &src, &param);
}
@@ -4232,10 +4253,8 @@ Field *Item::create_field_for_create_select(TABLE *table)
@retval 0 Error
*/
-TABLE *select_create::create_table_from_items(THD *thd,
- List<Item> *items,
- MYSQL_LOCK **lock,
- TABLEOP_HOOKS *hooks)
+TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
+ MYSQL_LOCK **lock, TABLEOP_HOOKS *hooks)
{
TABLE tmp_table; // Used during 'Create_field()'
TABLE_SHARE share;
@@ -4258,7 +4277,7 @@ TABLE *select_create::create_table_from_items(THD *thd,
if (!opt_explicit_defaults_for_timestamp)
promote_first_timestamp_column(&alter_info->create_list);
- if (create_info->vers_fix_system_fields(thd, alter_info, *create_table))
+ if (create_info->fix_create_fields(thd, alter_info, *create_table))
DBUG_RETURN(NULL);
while ((item=it++))
@@ -4297,10 +4316,10 @@ TABLE *select_create::create_table_from_items(THD *thd,
alter_info->create_list.push_back(cr_field, thd->mem_root);
}
- if (create_info->vers_check_system_fields(thd, alter_info,
- create_table->table_name,
- create_table->db,
- select_field_count))
+ if (create_info->check_fields(thd, alter_info,
+ create_table->table_name,
+ create_table->db,
+ select_field_count))
DBUG_RETURN(NULL);
DEBUG_SYNC(thd,"create_table_select_before_create");
@@ -4521,8 +4540,6 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
thd->binlog_start_trans_and_stmt();
}
- DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
-
if (!(table= create_table_from_items(thd, &values, &extra_lock, hook_ptr)))
/* abort() deletes table */
DBUG_RETURN(-1);
@@ -4640,9 +4657,16 @@ select_create::binlog_show_create_table(TABLE **tables, uint count)
/* suppress_use */ FALSE,
errcode) > 0;
}
-
- ha_fake_trx_id(thd);
-
+#ifdef WITH_WSREP
+ if (thd->wsrep_trx().active())
+ {
+ WSREP_DEBUG("transaction already started for CTAS");
+ }
+ else
+ {
+ wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
+ }
+#endif
return result;
}
@@ -4700,10 +4724,19 @@ bool select_create::send_eof()
if (!table->s->tmp_table)
{
#ifdef WITH_WSREP
- if (WSREP_ON)
+ if (WSREP(thd) &&
+ table->file->ht->db_type == DB_TYPE_INNODB)
{
+ if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
+ {
+ wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
+ }
+ DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
+ WSREP_DEBUG("CTAS key append for trx: %" PRIu64 " thd %llu query %lld ",
+ thd->wsrep_trx_id(), thd->thread_id, thd->query_id);
+
/*
- append table level exclusive key for CTAS
+ append table level exclusive key for CTAS
*/
wsrep_key_arr_t key_arr= {0, 0};
wsrep_prepare_keys_for_isolation(thd,
@@ -4711,15 +4744,11 @@ bool select_create::send_eof()
create_table->table_name.str,
table_list,
&key_arr);
- int rcode = wsrep->append_key(
- wsrep,
- &thd->wsrep_ws_handle,
- key_arr.keys, //&wkey,
- key_arr.keys_len,
- WSREP_KEY_EXCLUSIVE,
- false);
+ int rcode= wsrep_thd_append_key(thd, key_arr.keys, key_arr.keys_len,
+ WSREP_SERVICE_KEY_EXCLUSIVE);
wsrep_keys_free(&key_arr);
- if (rcode) {
+ if (rcode)
+ {
DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
WSREP_ERROR("Appending table key for CTAS failed: %s, %d",
(wsrep_thd_query(thd)) ?
@@ -4728,22 +4757,22 @@ bool select_create::send_eof()
DBUG_RETURN(true);
}
/* If commit fails, we should be able to reset the OK status. */
- thd->get_stmt_da()->set_overwrite_status(TRUE);
+ thd->get_stmt_da()->set_overwrite_status(true);
}
#endif /* WITH_WSREP */
trans_commit_stmt(thd);
if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
trans_commit_implicit(thd);
#ifdef WITH_WSREP
- if (WSREP_ON)
+ if (WSREP(thd))
{
thd->get_stmt_da()->set_overwrite_status(FALSE);
mysql_mutex_lock(&thd->LOCK_thd_data);
- if (thd->wsrep_conflict_state != NO_CONFLICT)
+ if (wsrep_current_error(thd))
{
- WSREP_DEBUG("select_create commit failed, thd: %lld err: %d %s",
- (longlong) thd->thread_id, thd->wsrep_conflict_state,
- thd->query());
+ WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s",
+ thd->thread_id,
+ wsrep_thd_transaction_state_str(thd), WSREP_QUERY(thd));
mysql_mutex_unlock(&thd->LOCK_thd_data);
abort_result_set();
DBUG_RETURN(true);