summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc92
1 files changed, 47 insertions, 45 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index e9a36629c66..dbec84b6d1a 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -872,6 +872,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
*/
query_cache_invalidate3(thd, table_list, 1);
}
+
+ if (thd->transaction.stmt.modified_non_trans_table)
+ thd->transaction.all.modified_non_trans_table= TRUE;
+
if ((changed && error <= 0) ||
thd->transaction.stmt.modified_non_trans_table ||
was_insert_delayed)
@@ -910,15 +914,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
*/
DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
if (thd->binlog_query(THD::ROW_QUERY_TYPE,
- thd->query(), thd->query_length(),
- transactional_table, FALSE,
- errcode))
+ thd->query(), thd->query_length(),
+ transactional_table, FALSE, FALSE,
+ errcode))
{
error=1;
}
}
- if (thd->transaction.stmt.modified_non_trans_table)
- thd->transaction.all.modified_non_trans_table= TRUE;
}
DBUG_ASSERT(transactional_table || !changed ||
thd->transaction.stmt.modified_non_trans_table);
@@ -1739,6 +1741,7 @@ public:
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
group_count(0)
{
+ DBUG_ENTER("Delayed_insert constructor");
thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
thd.security_ctx->host=(char*) my_localhost;
thd.current_tablenr=0;
@@ -1747,11 +1750,21 @@ public:
thd.lex->current_select= 0; // for my_message_sql
thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
/*
- Statement-based replication of INSERT DELAYED has problems with RAND()
- and user vars, so in mixed mode we go to row-based.
+ Statement-based replication of INSERT DELAYED has problems with
+ RAND() and user variables, so in mixed mode we go to row-based.
+ For normal commands, the unsafe flag is set at parse time.
+ However, since the flag is a member of the THD object, of which
+ the delayed_insert thread has its own copy, we must set the
+ statement to unsafe here and explicitly set row logging mode.
+
+ @todo set_current_stmt_binlog_format_row_if_mixed should not be
+ called by anything else than thd->decide_logging_format(). When
+ we call set_current_blah here, none of the checks in
+ decide_logging_format is made. We should probably call
+ thd->decide_logging_format() directly instead. /Sven
*/
- thd.lex->set_stmt_unsafe();
- thd.set_current_stmt_binlog_row_based_if_mixed();
+ thd.lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED);
+ thd.set_current_stmt_binlog_format_row_if_mixed();
bzero((char*) &thd.net, sizeof(thd.net)); // Safety
bzero((char*) &table_list, sizeof(table_list)); // Safety
@@ -1766,6 +1779,7 @@ public:
delayed_lock= global_system_variables.low_priority_updates ?
TL_WRITE_LOW_PRIORITY : TL_WRITE;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ DBUG_VOID_RETURN;
}
~Delayed_insert()
{
@@ -2310,12 +2324,6 @@ static void handle_delayed_insert_impl(THD *thd, Delayed_insert *di)
*/
lex_start(thd);
thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
- /*
- Statement-based replication of INSERT DELAYED has problems with RAND()
- and user vars, so in mixed mode we go to row-based.
- */
- thd->lex->set_stmt_unsafe();
- thd->set_current_stmt_binlog_row_based_if_mixed();
/* Open table */
if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
@@ -2584,6 +2592,7 @@ bool Delayed_insert::handle_inserts(void)
{
int error;
ulong max_rows;
+ bool has_trans = TRUE;
bool using_ignore= 0, using_opt_replace= 0,
using_bin_log= mysql_bin_log.is_open();
delayed_row *row;
@@ -2736,7 +2745,7 @@ bool Delayed_insert::handle_inserts(void)
*/
thd.binlog_query(THD::ROW_QUERY_TYPE,
row->query.str, row->query.length,
- FALSE, FALSE, errcode);
+ FALSE, FALSE, FALSE, errcode);
thd.time_zone_used = backup_time_zone_used;
thd.variables.time_zone = backup_time_zone;
@@ -2804,9 +2813,11 @@ bool Delayed_insert::handle_inserts(void)
or trigger.
TODO: Move the logging to last in the sequence of rows.
- */
- if (thd.current_stmt_binlog_row_based)
- thd.binlog_flush_pending_rows_event(TRUE);
+ */
+ has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
+ table->file->has_transactions();
+ if (thd.is_current_stmt_binlog_format_row())
+ thd.binlog_flush_pending_rows_event(TRUE, has_trans);
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
{ // This shouldn't happen
@@ -2870,19 +2881,6 @@ bool mysql_insert_select_prepare(THD *thd)
DBUG_ENTER("mysql_insert_select_prepare");
/*
- Statement-based replication of INSERT ... SELECT ... LIMIT is not safe
- as order of rows is not defined, so in mixed mode we go to row-based.
-
- Note that we may consider a statement as safe if ORDER BY primary_key
- is present or we SELECT a constant. However it may confuse users to
- see very similiar statements replicated differently.
- */
- if (lex->current_select->select_limit)
- {
- lex->set_stmt_unsafe();
- thd->set_current_stmt_binlog_row_based_if_mixed();
- }
- /*
SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
clause if table is VIEW
*/
@@ -3246,9 +3244,11 @@ bool select_insert::send_eof()
and ha_autocommit_or_rollback.
*/
query_cache_invalidate3(thd, table, 1);
- if (thd->transaction.stmt.modified_non_trans_table)
- thd->transaction.all.modified_non_trans_table= TRUE;
}
+
+ if (thd->transaction.stmt.modified_non_trans_table)
+ thd->transaction.all.modified_non_trans_table= TRUE;
+
DBUG_ASSERT(trans_table || !changed ||
thd->transaction.stmt.modified_non_trans_table);
@@ -3267,7 +3267,7 @@ bool select_insert::send_eof()
errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);
thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query(), thd->query_length(),
- trans_table, FALSE, errcode);
+ trans_table, FALSE, FALSE, errcode);
}
table->file->ha_release_auto_increment();
@@ -3333,15 +3333,16 @@ void select_insert::abort() {
transactional_table= table->file->has_transactions();
if (thd->transaction.stmt.modified_non_trans_table)
{
+ if (!can_rollback_data())
+ thd->transaction.all.modified_non_trans_table= TRUE;
+
if (mysql_bin_log.is_open())
{
int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
thd->query_length(),
- transactional_table, FALSE, errcode);
+ transactional_table, FALSE, FALSE, errcode);
}
- if (!thd->current_stmt_binlog_row_based && !can_rollback_data())
- thd->transaction.all.modified_non_trans_table= TRUE;
if (changed)
query_cache_invalidate3(thd, table, 1);
}
@@ -3585,11 +3586,11 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
virtual int do_postlock(TABLE **tables, uint count)
{
THD *thd= const_cast<THD*>(ptr->get_thd());
- if (int error= decide_logging_format(thd, &all_tables))
+ if (int error= thd->decide_logging_format(&all_tables))
return error;
TABLE const *const table = *tables;
- if (thd->current_stmt_binlog_row_based &&
+ if (thd->is_current_stmt_binlog_format_row() &&
!table->s->tmp_table &&
!ptr->get_create_info()->table_existed)
{
@@ -3613,7 +3614,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
temporary table, we need to start a statement transaction.
*/
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
- thd->current_stmt_binlog_row_based &&
+ thd->is_current_stmt_binlog_format_row() &&
mysql_bin_log.is_open())
{
thd->binlog_start_trans_and_stmt();
@@ -3632,7 +3633,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
create_table->table_name);
- if (thd->current_stmt_binlog_row_based)
+ if (thd->is_current_stmt_binlog_format_row())
binlog_show_create_table(&(create_table->table), 1);
table= create_table->table;
}
@@ -3720,7 +3721,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count)
schema that will do a close_thread_tables(), destroying the
statement transaction cache.
*/
- DBUG_ASSERT(thd->current_stmt_binlog_row_based);
+ DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
DBUG_ASSERT(tables && *tables && count > 0);
char buf[2048];
@@ -3742,6 +3743,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count)
thd->binlog_query(THD::STMT_QUERY_TYPE,
query.ptr(), query.length(),
/* is_trans */ TRUE,
+ /* direct */ FALSE,
/* suppress_use */ FALSE,
errcode);
}
@@ -3760,7 +3762,7 @@ void select_create::send_error(uint errcode,const char *err)
DBUG_PRINT("info",
("Current statement %s row-based",
- thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
+ thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT"));
DBUG_PRINT("info",
("Current table (at 0x%lu) %s a temporary (or non-existant) table",
(ulong) table,
@@ -3842,7 +3844,7 @@ void select_create::abort()
select_insert::abort();
thd->transaction.stmt.modified_non_trans_table= FALSE;
reenable_binlog(thd);
- thd->binlog_flush_pending_rows_event(TRUE);
+ thd->binlog_flush_pending_rows_event(TRUE, TRUE);
if (m_plock)
{