diff options
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 92 |
1 files changed, 47 insertions, 45 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index e9a36629c66..dbec84b6d1a 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -872,6 +872,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ query_cache_invalidate3(thd, table_list, 1); } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + if ((changed && error <= 0) || thd->transaction.stmt.modified_non_trans_table || was_insert_delayed) @@ -910,15 +914,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0); if (thd->binlog_query(THD::ROW_QUERY_TYPE, - thd->query(), thd->query_length(), - transactional_table, FALSE, - errcode)) + thd->query(), thd->query_length(), + transactional_table, FALSE, FALSE, + errcode)) { error=1; } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -1739,6 +1741,7 @@ public: table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0), group_count(0) { + DBUG_ENTER("Delayed_insert constructor"); thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user; thd.security_ctx->host=(char*) my_localhost; thd.current_tablenr=0; @@ -1747,11 +1750,21 @@ public: thd.lex->current_select= 0; // for my_message_sql thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock() /* - Statement-based replication of INSERT DELAYED has problems with RAND() - and user vars, so in mixed mode we go to row-based. + Statement-based replication of INSERT DELAYED has problems with + RAND() and user variables, so in mixed mode we go to row-based. + For normal commands, the unsafe flag is set at parse time. + However, since the flag is a member of the THD object, of which + the delayed_insert thread has its own copy, we must set the + statement to unsafe here and explicitly set row logging mode. + + @todo set_current_stmt_binlog_format_row_if_mixed should not be + called by anything else than thd->decide_logging_format(). When + we call set_current_blah here, none of the checks in + decide_logging_format is made. We should probably call + thd->decide_logging_format() directly instead. /Sven */ - thd.lex->set_stmt_unsafe(); - thd.set_current_stmt_binlog_row_based_if_mixed(); + thd.lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + thd.set_current_stmt_binlog_format_row_if_mixed(); bzero((char*) &thd.net, sizeof(thd.net)); // Safety bzero((char*) &table_list, sizeof(table_list)); // Safety @@ -1766,6 +1779,7 @@ public: delayed_lock= global_system_variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; VOID(pthread_mutex_unlock(&LOCK_thread_count)); + DBUG_VOID_RETURN; } ~Delayed_insert() { @@ -2310,12 +2324,6 @@ static void handle_delayed_insert_impl(THD *thd, Delayed_insert *di) */ lex_start(thd); thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock() - /* - Statement-based replication of INSERT DELAYED has problems with RAND() - and user vars, so in mixed mode we go to row-based. - */ - thd->lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); /* Open table */ if (!(di->table= open_n_lock_single_table(thd, &di->table_list, @@ -2584,6 +2592,7 @@ bool Delayed_insert::handle_inserts(void) { int error; ulong max_rows; + bool has_trans = TRUE; bool using_ignore= 0, using_opt_replace= 0, using_bin_log= mysql_bin_log.is_open(); delayed_row *row; @@ -2736,7 +2745,7 @@ bool Delayed_insert::handle_inserts(void) */ thd.binlog_query(THD::ROW_QUERY_TYPE, row->query.str, row->query.length, - FALSE, FALSE, errcode); + FALSE, FALSE, FALSE, errcode); thd.time_zone_used = backup_time_zone_used; thd.variables.time_zone = backup_time_zone; @@ -2804,9 +2813,11 @@ bool Delayed_insert::handle_inserts(void) or trigger. TODO: Move the logging to last in the sequence of rows. - */ - if (thd.current_stmt_binlog_row_based) - thd.binlog_flush_pending_rows_event(TRUE); + */ + has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); + if (thd.is_current_stmt_binlog_format_row()) + thd.binlog_flush_pending_rows_event(TRUE, has_trans); if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) { // This shouldn't happen @@ -2870,19 +2881,6 @@ bool mysql_insert_select_prepare(THD *thd) DBUG_ENTER("mysql_insert_select_prepare"); /* - Statement-based replication of INSERT ... SELECT ... LIMIT is not safe - as order of rows is not defined, so in mixed mode we go to row-based. - - Note that we may consider a statement as safe if ORDER BY primary_key - is present or we SELECT a constant. However it may confuse users to - see very similiar statements replicated differently. - */ - if (lex->current_select->select_limit) - { - lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); - } - /* SELECT_LEX do not belong to INSERT statement, so we can't add WHERE clause if table is VIEW */ @@ -3246,9 +3244,11 @@ bool select_insert::send_eof() and ha_autocommit_or_rollback. */ query_cache_invalidate3(thd, table, 1); - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + DBUG_ASSERT(trans_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -3267,7 +3267,7 @@ bool select_insert::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - trans_table, FALSE, errcode); + trans_table, FALSE, FALSE, errcode); } table->file->ha_release_auto_increment(); @@ -3333,15 +3333,16 @@ void select_insert::abort() { transactional_table= table->file->has_transactions(); if (thd->transaction.stmt.modified_non_trans_table) { + if (!can_rollback_data()) + thd->transaction.all.modified_non_trans_table= TRUE; + if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_table, FALSE, errcode); + transactional_table, FALSE, FALSE, errcode); } - if (!thd->current_stmt_binlog_row_based && !can_rollback_data()) - thd->transaction.all.modified_non_trans_table= TRUE; if (changed) query_cache_invalidate3(thd, table, 1); } @@ -3585,11 +3586,11 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) virtual int do_postlock(TABLE **tables, uint count) { THD *thd= const_cast<THD*>(ptr->get_thd()); - if (int error= decide_logging_format(thd, &all_tables)) + if (int error= thd->decide_logging_format(&all_tables)) return error; TABLE const *const table = *tables; - if (thd->current_stmt_binlog_row_based && + if (thd->is_current_stmt_binlog_format_row() && !table->s->tmp_table && !ptr->get_create_info()->table_existed) { @@ -3613,7 +3614,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) temporary table, we need to start a statement transaction. */ if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 && - thd->current_stmt_binlog_row_based && + thd->is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()) { thd->binlog_start_trans_and_stmt(); @@ -3632,7 +3633,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR), create_table->table_name); - if (thd->current_stmt_binlog_row_based) + if (thd->is_current_stmt_binlog_format_row()) binlog_show_create_table(&(create_table->table), 1); table= create_table->table; } @@ -3720,7 +3721,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) schema that will do a close_thread_tables(), destroying the statement transaction cache. */ - DBUG_ASSERT(thd->current_stmt_binlog_row_based); + DBUG_ASSERT(thd->is_current_stmt_binlog_format_row()); DBUG_ASSERT(tables && *tables && count > 0); char buf[2048]; @@ -3742,6 +3743,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) thd->binlog_query(THD::STMT_QUERY_TYPE, query.ptr(), query.length(), /* is_trans */ TRUE, + /* direct */ FALSE, /* suppress_use */ FALSE, errcode); } @@ -3760,7 +3762,7 @@ void select_create::send_error(uint errcode,const char *err) DBUG_PRINT("info", ("Current statement %s row-based", - thd->current_stmt_binlog_row_based ? "is" : "is NOT")); + thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT")); DBUG_PRINT("info", ("Current table (at 0x%lu) %s a temporary (or non-existant) table", (ulong) table, @@ -3842,7 +3844,7 @@ void select_create::abort() select_insert::abort(); thd->transaction.stmt.modified_non_trans_table= FALSE; reenable_binlog(thd); - thd->binlog_flush_pending_rows_event(TRUE); + thd->binlog_flush_pending_rows_event(TRUE, TRUE); if (m_plock) { |