summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <mats@mysql.com>2006-02-24 16:19:55 +0100
committerunknown <mats@mysql.com>2006-02-24 16:19:55 +0100
commit613fb54f95f440baa02ba31fbc317c2a880f3513 (patch)
treee45e91976dfa93a26c07b19e063f5073126ba2d2 /sql
parent738a1ca08db810aa37069da149aa21590292caac (diff)
downloadmariadb-git-613fb54f95f440baa02ba31fbc317c2a880f3513.tar.gz
WL#3023 (RBR: Use locks in a statement-like manner):
Adaptions to make it work with NDB. mysql-test/extra/binlog_tests/binlog.test: Using replace_regex to remove table id. mysql-test/extra/binlog_tests/blackhole.test: Using replace_regex to remove table id. mysql-test/extra/binlog_tests/ctype_cp932.test: Using replace_regex to remove table id. mysql-test/extra/binlog_tests/ctype_cp932_binlog.test: Using replace_regex to remove table id. mysql-test/extra/binlog_tests/ctype_ucs_binlog.test: Using replace_regex to remove table id. mysql-test/extra/binlog_tests/drop_temp_table.test: Using replace_regex to remove table id. mysql-test/extra/binlog_tests/insert_select-binlog.test: Using replace_regex to remove table id. mysql-test/extra/binlog_tests/mix_innodb_myisam_binlog.test: Using replace_regex to remove table id. mysql-test/extra/rpl_tests/rpl_flsh_tbls.test: Using replace_regex to remove table id. mysql-test/extra/rpl_tests/rpl_log.test: Using replace_regex to remove table id. mysql-test/extra/rpl_tests/rpl_multi_query.test: Using replace_regex to remove table id. mysql-test/extra/rpl_tests/rpl_row_charset.test: Using replace_regex to remove table id. mysql-test/extra/rpl_tests/rpl_row_delayed_ins.test: Using replace_regex to remove table id. mysql-test/extra/rpl_tests/rpl_stm_charset.test: Using replace_regex to remove table id. mysql-test/include/rpl_row_basic.inc: Removing sync with master on cleanup since there are engines that does not work in a "syncronized" fashion on dropping tables. mysql-test/r/binlog_row_binlog.result: Result change mysql-test/r/binlog_row_blackhole.result: Result change mysql-test/r/binlog_row_ctype_cp932.result: Result change mysql-test/r/binlog_row_ctype_ucs.result: Result change mysql-test/r/binlog_row_insert_select.result: Result change mysql-test/r/binlog_row_mix_innodb_myisam.result: Result change mysql-test/r/ctype_cp932_binlog_row.result: Result change mysql-test/r/rpl_row_basic_11bugs.result: Result change mysql-test/r/rpl_row_basic_2myisam.result: Result change mysql-test/r/rpl_row_basic_3innodb.result: Result change mysql-test/r/rpl_row_charset.result: Result change mysql-test/r/rpl_row_create_table.result: Result change mysql-test/r/rpl_row_delayed_ins.result: Result change mysql-test/r/rpl_row_log.result: Result change mysql-test/r/rpl_row_log_innodb.result: Result change mysql-test/r/rpl_row_max_relay_size.result: Result change mysql-test/r/rpl_row_sp008.result: Result change mysql-test/t/binlog_stm_binlog.test: Using replace_regex to remove table id. mysql-test/t/ndb_binlog_ddl_multi.test: Using replace_regex to remove table id. mysql-test/t/ndb_binlog_ignore_db.test: Using replace_regex to remove table id. mysql-test/t/rpl_heap.test: Using replace_regex to remove table id. mysql-test/t/rpl_loaddata_s.test: Using replace_regex to remove table id. mysql-test/t/rpl_ndb_blob.test: Using replace_regex to remove table id. mysql-test/t/rpl_ndb_disk.test: Using replace_regex to remove table id. mysql-test/t/rpl_row_basic_11bugs.test: Using replace_regex to remove table id. mysql-test/t/rpl_row_create_table.test: Using replace_regex to remove table id. mysql-test/t/rpl_row_drop.test: Using replace_regex to remove table id. mysql-test/t/rpl_row_sp008.test: Using replace_regex to remove table id. mysql-test/t/rpl_sp.test: Using replace_regex to remove table id. mysql-test/t/sp.test: Using replace_regex to remove table id. mysql-test/t/sp_notembedded.test: Using replace_regex to remove table id. mysql-test/t/user_var-binlog.test: Using replace_regex to remove table id. sql/ha_ndbcluster_binlog.cc: Assign_new_table_id() now takes table share. Removed gratuitous friend declaration of ndb_add_binlog_index(). Turning of binlogging during execution of ndb_add_binlog_index(). sql/handler.h: Removed gratuitous friend declaration of ndb_add_binlog_index(). sql/log.cc: Adding debug printout. sql/log_event.cc: Closing thread tables on dummy event. sql/rpl_injector.cc: Added support for new locking scheme. sql/rpl_injector.h: Added support for new locking scheme.
Diffstat (limited to 'sql')
-rw-r--r--sql/ha_ndbcluster_binlog.cc17
-rw-r--r--sql/handler.h2
-rw-r--r--sql/log.cc2
-rw-r--r--sql/log_event.cc17
-rw-r--r--sql/rpl_injector.cc29
-rw-r--r--sql/rpl_injector.h74
6 files changed, 123 insertions, 18 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index 06f946b013c..52ab3afcba1 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -310,7 +310,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
table= 0;
break;
}
- assign_new_table_id(table);
+ assign_new_table_id(table_share);
if (!table->record[1] || table->record[1] == table->record[0])
{
table->record[1]= alloc_root(&table->mem_root,
@@ -1620,15 +1620,20 @@ static int open_binlog_index(THD *thd, TABLE_LIST *tables,
/*
Insert one row in the cluster_replication.binlog_index
-
- declared friend in handler.h to be able to call write_row directly
- so that this insert is not replicated
*/
int ndb_add_binlog_index(THD *thd, void *_row)
{
Binlog_index_row &row= *(Binlog_index_row *) _row;
int error= 0;
bool need_reopen;
+
+ /*
+ Turn of binlogging to prevent the table changes to be written to
+ the binary log.
+ */
+ ulong saved_options= thd->options;
+ thd->options&= ~(OPTION_BIN_LOG);
+
for ( ; ; ) /* loop for need_reopen */
{
if (!binlog_index && open_binlog_index(thd, &binlog_tables, &binlog_index))
@@ -1663,7 +1668,7 @@ int ndb_add_binlog_index(THD *thd, void *_row)
binlog_index->field[6]->store(row.n_schemaops);
int r;
- if ((r= binlog_index->file->write_row(binlog_index->record[0])))
+ if ((r= binlog_index->file->ha_write_row(binlog_index->record[0])))
{
sql_print_error("NDB Binlog: Writing row to binlog_index: %d", r);
error= -1;
@@ -1672,10 +1677,12 @@ int ndb_add_binlog_index(THD *thd, void *_row)
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
+ thd->options= saved_options;
return 0;
add_binlog_index_err:
close_thread_tables(thd);
binlog_index= 0;
+ thd->options= saved_options;
return error;
}
diff --git a/sql/handler.h b/sql/handler.h
index dd445637b9f..37bf5335077 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -1907,8 +1907,6 @@ public:
{ return COMPATIBLE_DATA_NO; }
private:
- friend int ndb_add_binlog_index(THD *, void *);
-
/*
Row-level primitives for storage engines. These should be
overridden by the storage engine class. To call these methods, use
diff --git a/sql/log.cc b/sql/log.cc
index c2fadb9d845..2b75bda2d70 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -2680,6 +2680,8 @@ int MYSQL_LOG::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event)
DBUG_ASSERT(trx_data);
+ DBUG_PRINT("info", ("trx_data->pending=%p", trx_data->pending));
+
if (Rows_log_event* pending= trx_data->pending)
{
IO_CACHE *file= &log_file;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 9fbb29bbc8c..944190c6d20 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -5268,8 +5268,8 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
/*
If m_table_id == ULONG_MAX, then we have a dummy event that does
not contain any data. In that case, we just remove all tables in
- the tables_to_lock list, step the relay log position, and return
- with success.
+ the tables_to_lock list, close the thread tables, step the relay
+ log position, and return with success.
*/
if (m_table_id == ULONG_MAX)
{
@@ -5280,6 +5280,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
DBUG_ASSERT(get_flags(STMT_END_F));
rli->clear_tables_to_lock();
+ close_thread_tables(thd);
thd->clear_error();
rli->inc_event_relay_log_pos();
DBUG_RETURN(0);
@@ -5414,12 +5415,16 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
DBUG_ASSERT(row_end != NULL); // cannot happen
DBUG_ASSERT(row_end <= (const char*)m_rows_end);
+#if 0
/* in_use can have been set to NULL in close_tables_for_reopen */
THD* old_thd= table->in_use;
if (!table->in_use)
table->in_use= thd;
+#endif
error= do_exec_row(table);
+#if 0
table->in_use = old_thd;
+#endif
switch (error)
{
/* Some recoverable errors */
@@ -5599,14 +5604,12 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void Rows_log_event::pack_info(Protocol *protocol)
{
-#ifdef DBUG_RBR
char buf[256];
char const *const flagstr=
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
my_size_t bytes= snprintf(buf, sizeof(buf),
"table_id: %lu%s", m_table_id, flagstr);
protocol->store(buf, bytes, &my_charset_bin);
-#endif
}
#endif
@@ -6009,17 +6012,11 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void Table_map_log_event::pack_info(Protocol *protocol)
{
-#ifdef DBUG_RBR
char buf[256];
my_size_t bytes= snprintf(buf, sizeof(buf),
"table_id: %lu (%s.%s)",
m_table_id, m_dbnam, m_tblnam);
protocol->store(buf, bytes, &my_charset_bin);
-#else
- char buf[256];
- my_size_t bytes= snprintf(buf, sizeof(buf), "%s.%s", m_dbnam, m_tblnam);
- protocol->store(buf, bytes, &my_charset_bin);
-#endif
}
#endif
diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc
index a69dea9a158..a69cfc2b75f 100644
--- a/sql/rpl_injector.cc
+++ b/sql/rpl_injector.cc
@@ -26,7 +26,7 @@
/* inline since it's called below */
inline
injector::transaction::transaction(MYSQL_LOG *log, THD *thd)
- : m_thd(thd)
+ : m_state(START_STATE), m_thd(thd)
{
/*
Default initialization of m_start_pos (which initializes it to garbage).
@@ -64,12 +64,31 @@ int injector::transaction::commit()
DBUG_RETURN(0);
}
+int injector::transaction::use_table(server_id_type sid, table tbl)
+{
+ DBUG_ENTER("injector::transaction::use_table");
+
+ int error;
+
+ if ((error= check_state(TABLE_STATE)))
+ DBUG_RETURN(error);
+
+ m_thd->set_server_id(sid);
+ error= m_thd->binlog_write_table_map(tbl.get_table(),
+ tbl.is_transactional());
+ DBUG_RETURN(error);
+}
+
int injector::transaction::write_row (server_id_type sid, table tbl,
MY_BITMAP const* cols, size_t colcnt,
record_type record)
{
DBUG_ENTER("injector::transaction::write_row(...)");
+
+ if (int error= check_state(ROW_STATE))
+ DBUG_RETURN(error);
+
m_thd->set_server_id(sid);
m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record);
@@ -82,6 +101,10 @@ int injector::transaction::delete_row(server_id_type sid, table tbl,
record_type record)
{
DBUG_ENTER("injector::transaction::delete_row(...)");
+
+ if (int error= check_state(ROW_STATE))
+ DBUG_RETURN(error);
+
m_thd->set_server_id(sid);
m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record);
@@ -94,6 +117,10 @@ int injector::transaction::update_row(server_id_type sid, table tbl,
record_type before, record_type after)
{
DBUG_ENTER("injector::transaction::update_row(...)");
+
+ if (int error= check_state(ROW_STATE))
+ DBUG_RETURN(error);
+
m_thd->set_server_id(sid);
m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, before, after);
diff --git a/sql/rpl_injector.h b/sql/rpl_injector.h
index 32d3fdd1a78..50a0a56dd9b 100644
--- a/sql/rpl_injector.h
+++ b/sql/rpl_injector.h
@@ -160,6 +160,24 @@ public:
}
/*
+
+ DESCRIPTION
+
+ Register table for use within the transaction. All tables
+ that are going to be used need to be registered before being
+ used below. The member function will fail with an error if
+ use_table() is called after any *_row() function has been
+ called for the transaction.
+
+ RETURN VALUE
+
+ 0 All OK
+ >0 Failure
+
+ */
+ int use_table(server_id_type sid, table tbl);
+
+ /*
Add a 'write row' entry to the transaction.
*/
int write_row (server_id_type sid, table tbl,
@@ -219,6 +237,62 @@ public:
}
}
+ enum enum_state
+ {
+ START_STATE, /* Start state */
+ TABLE_STATE, /* At least one table has been registered */
+ ROW_STATE, /* At least one row has been registered */
+ STATE_COUNT /* State count and sink state */
+ } m_state;
+
+ /*
+ Check and update the state.
+
+ PARAMETER(S)
+
+ target_state
+ The state we are moving to: TABLE_STATE if we are
+ writing a table and ROW_STATE if we are writing a row.
+
+ DESCRIPTION
+
+ The internal state will be updated to the target state if
+ and only if it is a legal move. The only legal moves are:
+
+ START_STATE -> START_STATE
+ START_STATE -> TABLE_STATE
+ TABLE_STATE -> TABLE_STATE
+ TABLE_STATE -> ROW_STATE
+
+ That is:
+ - It is not possible to write any row before having written at
+ least one table
+ - It is not possible to write a table after at least one row
+ has been written
+
+ RETURN VALUE
+
+ 0 All OK
+ -1 Incorrect call sequence
+ */
+ int check_state(enum_state const target_state)
+ {
+ static char const *state_name[] = {
+ "START_STATE", "TABLE_STATE", "ROW_STATE", "STATE_COUNT"
+ };
+
+ DBUG_ASSERT(0 <= target_state && target_state <= STATE_COUNT);
+ DBUG_PRINT("info", ("In state %s", state_name[m_state]));
+
+ if (m_state <= target_state && target_state <= m_state + 1 &&
+ m_state < STATE_COUNT)
+ m_state= target_state;
+ else
+ m_state= STATE_COUNT;
+ return m_state == STATE_COUNT ? 1 : 0;
+ }
+
+
binlog_pos m_start_pos;
THD *m_thd;
};