summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc94
1 files changed, 75 insertions, 19 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 4dcbf9af4a0..7779adbf303 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -2640,8 +2640,7 @@ void select_insert::send_error(uint errcode,const char *err)
If the creation of the table failed (due to a syntax error, for
example), no table will have been opened and therefore 'table'
will be NULL. In that case, we still need to execute the rollback
- and the end of the function to truncate the binary log, but we can
- skip all the intermediate steps.
+ and the end of the function.
*/
if (table)
{
@@ -2672,10 +2671,8 @@ void select_insert::send_error(uint errcode,const char *err)
if (!table->file->has_transactions())
{
if (mysql_bin_log.is_open())
- {
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
table->file->has_transactions(), FALSE);
- }
if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
!can_rollback_data())
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
@@ -2943,6 +2940,24 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
TABLEOP_HOOKS *hook_ptr= NULL;
#ifdef HAVE_ROW_BASED_REPLICATION
+ /*
+ For row-based replication, the CREATE-SELECT statement is written
+ in two pieces: the first one contain the CREATE TABLE statement
+ necessary to create the table and the second part contain the rows
+ that should go into the table.
+
+ For non-temporary tables, the start of the CREATE-SELECT
+ implicitly commits the previous transaction, and all events
+ forming the statement will be stored the transaction cache. At end
+ of the statement, the entire statement is committed as a
+ transaction, and all events are written to the binary log.
+
+ On the master, the table is locked for the duration of the
+ statement, but since the CREATE part is replicated as a simple
+ statement, there is no way to lock the table for accesses on the
+ slave. Hence, we have to hold on to the CREATE part of the
+ statement until the statement has finished.
+ */
class MY_HOOKS : public TABLEOP_HOOKS {
public:
MY_HOOKS(select_create *x) : ptr(x) { }
@@ -2952,7 +2967,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
{
TABLE const *const table = *tables;
if (ptr->get_thd()->current_stmt_binlog_row_based &&
- table->s->tmp_table == NO_TMP_TABLE &&
+ !table->s->tmp_table &&
!ptr->get_create_info()->table_existed)
{
ptr->binlog_show_create_table(tables, count);
@@ -2970,9 +2985,9 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
#ifdef HAVE_ROW_BASED_REPLICATION
/*
- Start a statement transaction before the create if we are creating
- a non-temporary table and are using row-based replication for the
- statement.
+ Start a statement transaction before the create if we are using
+ row-based replication for the statement. If we are creating a
+ temporary table, we need to start a statement transaction.
*/
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
thd->current_stmt_binlog_row_based)
@@ -3076,13 +3091,35 @@ void select_create::store_values(List<Item> &values)
void select_create::send_error(uint errcode,const char *err)
{
+ DBUG_ENTER("select_create::send_error");
+
+ DBUG_PRINT("info",
+ ("Current statement %s row-based",
+ thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
+ DBUG_PRINT("info",
+ ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
+ table,
+ table && !table->s->tmp_table ? "is NOT" : "is"));
+ DBUG_PRINT("info",
+ ("Table %s prior to executing this statement",
+ get_create_info()->table_existed ? "existed" : "did not exist"));
+
/*
- Disable binlog, because we "roll back" partial inserts in ::abort
- by removing the table, even for non-transactional tables.
+ This will execute any rollbacks that are necessary before writing
+ the transcation cache.
+
+ We disable the binary log since nothing should be written to the
+ binary log. This disabling is important, since we potentially do
+ a "roll back" of non-transactional tables by removing the table,
+ and the actual rollback might generate events that should not be
+ written to the binary log.
+
*/
tmp_disable_binlog(thd);
select_insert::send_error(errcode, err);
reenable_binlog(thd);
+
+ DBUG_VOID_RETURN;
}
@@ -3093,6 +3130,14 @@ bool select_create::send_eof()
abort();
else
{
+ /*
+ Do an implicit commit at end of statement for non-temporary
+ tables. This can fail, but we should unlock the table
+ nevertheless.
+ */
+ if (!table->s->tmp_table)
+ ha_commit(thd); // Can fail, but we proceed anyway
+
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
VOID(pthread_mutex_lock(&LOCK_open));
@@ -3111,12 +3156,31 @@ bool select_create::send_eof()
void select_create::abort()
{
+ DBUG_ENTER("select_create::abort");
VOID(pthread_mutex_lock(&LOCK_open));
+
+ /*
+ We roll back the statement, including truncating the transaction
+ cache of the binary log, if the statement failed.
+
+ We roll back the statement prior to deleting the table and prior
+ to releasing the lock on the table, since there might be potential
+ for failure if the rollback is executed after the drop or after
+ unlocking the table.
+
+ We also roll back the statement regardless of whether the creation
+ of the table succeeded or not, since we need to reset the binary
+ log state.
+ */
+ if (thd->current_stmt_binlog_row_based)
+ ha_rollback_stmt(thd);
+
if (thd->extra_lock)
{
mysql_unlock_tables(thd, thd->extra_lock);
thd->extra_lock=0;
}
+
if (table)
{
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
@@ -3128,17 +3192,8 @@ void select_create::abort()
table->s->version= 0;
hash_delete(&open_cache,(byte*) table);
if (!create_info->table_existed)
- {
quick_rm_table(table_type, create_table->db,
create_table->table_name, 0);
- /*
- We roll back the statement, including truncating the
- transaction cache of the binary log, if the statement
- failed.
- */
- if (thd->current_stmt_binlog_row_based)
- ha_rollback_stmt(thd);
- }
/* Tell threads waiting for refresh that something has happened */
if (version != refresh_version)
broadcast_refresh();
@@ -3148,6 +3203,7 @@ void select_create::abort()
table=0; // Safety
}
VOID(pthread_mutex_unlock(&LOCK_open));
+ DBUG_VOID_RETURN;
}