diff options
-rw-r--r-- | .bzrignore | 1 | ||||
-rw-r--r-- | mysql-test/r/rpl_ndb_stm_innodb.result | 37 | ||||
-rw-r--r-- | mysql-test/t/rpl_ndb_stm_innodb-master.opt | 1 | ||||
-rw-r--r-- | mysql-test/t/rpl_ndb_stm_innodb.test | 62 | ||||
-rw-r--r-- | sql/ha_ndbcluster.cc | 64 | ||||
-rw-r--r-- | sql/ha_ndbcluster.h | 6 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 4 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.h | 2 |
8 files changed, 174 insertions, 3 deletions
diff --git a/.bzrignore b/.bzrignore index c04ce91b355..55f55faa0e6 100644 --- a/.bzrignore +++ b/.bzrignore @@ -2956,3 +2956,4 @@ win/vs71cache.txt win/vs8cache.txt zlib/*.ds? zlib/*.vcproj +client/rpl_constants.h diff --git a/mysql-test/r/rpl_ndb_stm_innodb.result b/mysql-test/r/rpl_ndb_stm_innodb.result new file mode 100644 index 00000000000..9ed54a11c1c --- /dev/null +++ b/mysql-test/r/rpl_ndb_stm_innodb.result @@ -0,0 +1,37 @@ +stop slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +reset master; +reset slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +start slave; +create table t1 (a int key, b int) engine innodb; +create table t2 (a int key, b int) engine innodb; +alter table t1 engine ndb; +alter table t2 engine ndb; +insert into t1 values (1,2); +select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status; +@start_pos:=start_pos @end_pos:=end_pos +<start_pos> <end_pos> +show binlog events from <start_pos> limit 1; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 <start_pos> Query 1 # use `test`; insert into t1 values (1,2) +show binlog events from <start_pos> limit 1,1; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 # Xid 1 445 COMMIT /* XID */ +begin; +insert into t1 values (2,3); +insert into t2 values (3,4); +commit; +select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status; +@start_pos:=start_pos @end_pos:=end_pos +<start_pos> <end_pos> +show binlog events from <start_pos> limit 1; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 <start_pos> Query 1 # use `test`; BEGIN +show binlog events from <start_pos> limit 1,2; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 # Query # # use `test`; insert into t1 values (2,3) +master-bin.000001 # Query # # use `test`; insert into t2 values (3,4) +show binlog events from <start_pos> limit 3,1; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 # Xid 1 <end_pos> COMMIT /* XID */ diff --git a/mysql-test/t/rpl_ndb_stm_innodb-master.opt b/mysql-test/t/rpl_ndb_stm_innodb-master.opt new file mode 100644 index 00000000000..627becdbfb5 --- /dev/null +++ b/mysql-test/t/rpl_ndb_stm_innodb-master.opt @@ -0,0 +1 @@ +--innodb diff --git a/mysql-test/t/rpl_ndb_stm_innodb.test b/mysql-test/t/rpl_ndb_stm_innodb.test new file mode 100644 index 00000000000..b92fbbcfce6 --- /dev/null +++ b/mysql-test/t/rpl_ndb_stm_innodb.test @@ -0,0 +1,62 @@ +--source include/have_ndb.inc +--source include/have_innodb.inc +--source include/have_binlog_format_mixed_or_statement.inc +--source include/master-slave.inc + +--connection master +create table t1 (a int key, b int) engine innodb; +create table t2 (a int key, b int) engine innodb; + +--sync_slave_with_master +--connection slave +alter table t1 engine ndb; +alter table t2 engine ndb; + +# check binlog position without begin +--connection master +insert into t1 values (1,2); + +--sync_slave_with_master +--connection slave +--replace_column 1 <start_pos> 2 <end_pos> +select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status; +--let $start_pos = `select @start_pos` +--let $end_pos = `select @end_pos` + +--connection master +# here is actually a bug, since there is no begin statement, the +# query is autocommitted, and end_pos shows end of the insert and not +# end of the commit +--replace_result $start_pos <start_pos> +--replace_column 5 # +--eval show binlog events from $start_pos limit 1 +--replace_result $start_pos <start_pos> $end_pos <end_pos> +--replace_column 2 # +--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ +--eval show binlog events from $start_pos limit 1,1 + +# check binlog position with begin +--connection master +begin; +insert into t1 values (2,3); +insert into t2 values (3,4); +commit; + +--sync_slave_with_master +--connection slave +--replace_column 1 <start_pos> 2 <end_pos> +select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status; +--let $start_pos = `select @start_pos` +--let $end_pos = `select @end_pos` + +--connection master +--replace_result $start_pos <start_pos> +--replace_column 5 # +--eval show binlog events from $start_pos limit 1 +--replace_result $start_pos <start_pos> +--replace_column 2 # 4 # 5 # +--eval show binlog events from $start_pos limit 1,2 +--replace_result $start_pos <start_pos> $end_pos <end_pos> +--replace_column 2 # +--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ +--eval show binlog events from $start_pos limit 3,1 diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 353dd806448..416cd18d924 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -4123,6 +4123,58 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, - refresh list of the indexes for the table if needed (if altered) */ +#ifdef HAVE_NDB_BINLOG +extern MASTER_INFO *active_mi; +static int ndbcluster_update_apply_status(THD *thd, int do_update) +{ + Thd_ndb *thd_ndb= get_thd_ndb(thd); + Ndb *ndb= thd_ndb->ndb; + NDBDICT *dict= ndb->getDictionary(); + const NDBTAB *ndbtab; + NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt; + ndb->setDatabaseName(NDB_REP_DB); + Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE); + if (!(ndbtab= ndbtab_g.get_table())) + { + return -1; + } + NdbOperation *op= 0; + int r= 0; + r|= (op= trans->getNdbOperation(ndbtab)) == 0; + DBUG_ASSERT(r == 0); + if (do_update) + r|= op->updateTuple(); + else + r|= op->writeTuple(); + DBUG_ASSERT(r == 0); + // server_id + r|= op->equal(0u, (Uint64)thd->server_id); + DBUG_ASSERT(r == 0); + if (!do_update) + { + // epoch + r|= op->setValue(1u, (Uint64)0); + DBUG_ASSERT(r == 0); + } + // log_name + char tmp_buf[FN_REFLEN]; + ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf, + active_mi->rli.group_master_log_name, + strlen(active_mi->rli.group_master_log_name)); + r|= op->setValue(2u, tmp_buf); + DBUG_ASSERT(r == 0); + // start_pos + r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos); + DBUG_ASSERT(r == 0); + // end_pos + r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos + + ((Uint64)active_mi->rli.future_event_relay_log_pos - + (Uint64)active_mi->rli.group_relay_log_pos)); + DBUG_ASSERT(r == 0); + return 0; +} +#endif /* HAVE_NDB_BINLOG */ + int ha_ndbcluster::external_lock(THD *thd, int lock_type) { int error=0; @@ -4173,6 +4225,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) thd_ndb->init_open_tables(); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; + thd_ndb->trans_options= 0; trans_register_ha(thd, FALSE, ndbcluster_hton); } else @@ -4189,6 +4242,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) thd_ndb->init_open_tables(); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; + thd_ndb->trans_options= 0; trans_register_ha(thd, TRUE, ndbcluster_hton); /* @@ -4229,7 +4283,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) // Start of transaction m_rows_changed= 0; m_ops_pending= 0; - +#ifdef HAVE_NDB_BINLOG + if (m_share == ndb_apply_status_share && thd->slave_thread) + thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS; +#endif // TODO remove double pointers... m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table); m_table_info= &m_thd_ndb_share->stat; @@ -4372,6 +4429,11 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all) "stmt" : "all")); DBUG_ASSERT(ndb && trans); +#ifdef HAVE_NDB_BINLOG + if (thd->slave_thread) + ndbcluster_update_apply_status(thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS); +#endif /* HAVE_NDB_BINLOG */ + if (execute_commit(thd,trans) != 0) { const NdbError err= trans->getNdbError(); diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index 6cc0e423f2f..98f6efe15d8 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -593,6 +593,11 @@ enum THD_NDB_OPTIONS TNO_NO_LOG_SCHEMA_OP= 1 << 0 }; +enum THD_NDB_TRANS_OPTIONS +{ + TNTO_INJECTED_APPLY_STATUS= 1 << 0 +}; + struct Ndb_local_table_statistics { int no_uncommitted_rows_count; ulong last_count; @@ -620,6 +625,7 @@ class Thd_ndb NdbTransaction *stmt; int error; uint32 options; + uint32 trans_options; List<NDB_SHARE> changed_tables; uint query_state; HASH open_tables; diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 8b51ac68e66..060bc0beff4 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -953,8 +953,8 @@ static void ndbcluster_get_schema(NDB_SHARE *share, /* helper function to pack a ndb varchar */ -static char *ndb_pack_varchar(const NDBCOL *col, char *buf, - const char *str, int sz) +char *ndb_pack_varchar(const NDBCOL *col, char *buf, + const char *str, int sz) { switch (col->getArrayType()) { diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h index 00fc689f061..7864cf3c0aa 100644 --- a/sql/ha_ndbcluster_binlog.h +++ b/sql/ha_ndbcluster_binlog.h @@ -181,6 +181,8 @@ int ndbcluster_find_all_files(THD *thd); void ndb_unpack_record(TABLE *table, NdbValue *value, MY_BITMAP *defined, byte *buf); +char *ndb_pack_varchar(const NDBCOL *col, char *buf, + const char *str, int sz); NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table, |