summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.bzrignore1
-rw-r--r--mysql-test/r/rpl_ndb_stm_innodb.result37
-rw-r--r--mysql-test/t/rpl_ndb_stm_innodb-master.opt1
-rw-r--r--mysql-test/t/rpl_ndb_stm_innodb.test62
-rw-r--r--sql/ha_ndbcluster.cc64
-rw-r--r--sql/ha_ndbcluster.h6
-rw-r--r--sql/ha_ndbcluster_binlog.cc4
-rw-r--r--sql/ha_ndbcluster_binlog.h2
8 files changed, 174 insertions, 3 deletions
diff --git a/.bzrignore b/.bzrignore
index c04ce91b355..55f55faa0e6 100644
--- a/.bzrignore
+++ b/.bzrignore
@@ -2956,3 +2956,4 @@ win/vs71cache.txt
win/vs8cache.txt
zlib/*.ds?
zlib/*.vcproj
+client/rpl_constants.h
diff --git a/mysql-test/r/rpl_ndb_stm_innodb.result b/mysql-test/r/rpl_ndb_stm_innodb.result
new file mode 100644
index 00000000000..9ed54a11c1c
--- /dev/null
+++ b/mysql-test/r/rpl_ndb_stm_innodb.result
@@ -0,0 +1,37 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+create table t1 (a int key, b int) engine innodb;
+create table t2 (a int key, b int) engine innodb;
+alter table t1 engine ndb;
+alter table t2 engine ndb;
+insert into t1 values (1,2);
+select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
+@start_pos:=start_pos @end_pos:=end_pos
+<start_pos> <end_pos>
+show binlog events from <start_pos> limit 1;
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 <start_pos> Query 1 # use `test`; insert into t1 values (1,2)
+show binlog events from <start_pos> limit 1,1;
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 # Xid 1 445 COMMIT /* XID */
+begin;
+insert into t1 values (2,3);
+insert into t2 values (3,4);
+commit;
+select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
+@start_pos:=start_pos @end_pos:=end_pos
+<start_pos> <end_pos>
+show binlog events from <start_pos> limit 1;
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 <start_pos> Query 1 # use `test`; BEGIN
+show binlog events from <start_pos> limit 1,2;
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 # Query # # use `test`; insert into t1 values (2,3)
+master-bin.000001 # Query # # use `test`; insert into t2 values (3,4)
+show binlog events from <start_pos> limit 3,1;
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 # Xid 1 <end_pos> COMMIT /* XID */
diff --git a/mysql-test/t/rpl_ndb_stm_innodb-master.opt b/mysql-test/t/rpl_ndb_stm_innodb-master.opt
new file mode 100644
index 00000000000..627becdbfb5
--- /dev/null
+++ b/mysql-test/t/rpl_ndb_stm_innodb-master.opt
@@ -0,0 +1 @@
+--innodb
diff --git a/mysql-test/t/rpl_ndb_stm_innodb.test b/mysql-test/t/rpl_ndb_stm_innodb.test
new file mode 100644
index 00000000000..b92fbbcfce6
--- /dev/null
+++ b/mysql-test/t/rpl_ndb_stm_innodb.test
@@ -0,0 +1,62 @@
+--source include/have_ndb.inc
+--source include/have_innodb.inc
+--source include/have_binlog_format_mixed_or_statement.inc
+--source include/master-slave.inc
+
+--connection master
+create table t1 (a int key, b int) engine innodb;
+create table t2 (a int key, b int) engine innodb;
+
+--sync_slave_with_master
+--connection slave
+alter table t1 engine ndb;
+alter table t2 engine ndb;
+
+# check binlog position without begin
+--connection master
+insert into t1 values (1,2);
+
+--sync_slave_with_master
+--connection slave
+--replace_column 1 <start_pos> 2 <end_pos>
+select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
+--let $start_pos = `select @start_pos`
+--let $end_pos = `select @end_pos`
+
+--connection master
+# here is actually a bug, since there is no begin statement, the
+# query is autocommitted, and end_pos shows end of the insert and not
+# end of the commit
+--replace_result $start_pos <start_pos>
+--replace_column 5 #
+--eval show binlog events from $start_pos limit 1
+--replace_result $start_pos <start_pos> $end_pos <end_pos>
+--replace_column 2 #
+--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/
+--eval show binlog events from $start_pos limit 1,1
+
+# check binlog position with begin
+--connection master
+begin;
+insert into t1 values (2,3);
+insert into t2 values (3,4);
+commit;
+
+--sync_slave_with_master
+--connection slave
+--replace_column 1 <start_pos> 2 <end_pos>
+select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
+--let $start_pos = `select @start_pos`
+--let $end_pos = `select @end_pos`
+
+--connection master
+--replace_result $start_pos <start_pos>
+--replace_column 5 #
+--eval show binlog events from $start_pos limit 1
+--replace_result $start_pos <start_pos>
+--replace_column 2 # 4 # 5 #
+--eval show binlog events from $start_pos limit 1,2
+--replace_result $start_pos <start_pos> $end_pos <end_pos>
+--replace_column 2 #
+--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/
+--eval show binlog events from $start_pos limit 3,1
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 353dd806448..416cd18d924 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -4123,6 +4123,58 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
- refresh list of the indexes for the table if needed (if altered)
*/
+#ifdef HAVE_NDB_BINLOG
+extern MASTER_INFO *active_mi;
+static int ndbcluster_update_apply_status(THD *thd, int do_update)
+{
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ Ndb *ndb= thd_ndb->ndb;
+ NDBDICT *dict= ndb->getDictionary();
+ const NDBTAB *ndbtab;
+ NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
+ ndb->setDatabaseName(NDB_REP_DB);
+ Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
+ if (!(ndbtab= ndbtab_g.get_table()))
+ {
+ return -1;
+ }
+ NdbOperation *op= 0;
+ int r= 0;
+ r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+ DBUG_ASSERT(r == 0);
+ if (do_update)
+ r|= op->updateTuple();
+ else
+ r|= op->writeTuple();
+ DBUG_ASSERT(r == 0);
+ // server_id
+ r|= op->equal(0u, (Uint64)thd->server_id);
+ DBUG_ASSERT(r == 0);
+ if (!do_update)
+ {
+ // epoch
+ r|= op->setValue(1u, (Uint64)0);
+ DBUG_ASSERT(r == 0);
+ }
+ // log_name
+ char tmp_buf[FN_REFLEN];
+ ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf,
+ active_mi->rli.group_master_log_name,
+ strlen(active_mi->rli.group_master_log_name));
+ r|= op->setValue(2u, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ // start_pos
+ r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos);
+ DBUG_ASSERT(r == 0);
+ // end_pos
+ r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos +
+ ((Uint64)active_mi->rli.future_event_relay_log_pos -
+ (Uint64)active_mi->rli.group_relay_log_pos));
+ DBUG_ASSERT(r == 0);
+ return 0;
+}
+#endif /* HAVE_NDB_BINLOG */
+
int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{
int error=0;
@@ -4173,6 +4225,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables();
thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
+ thd_ndb->trans_options= 0;
trans_register_ha(thd, FALSE, ndbcluster_hton);
}
else
@@ -4189,6 +4242,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables();
thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
+ thd_ndb->trans_options= 0;
trans_register_ha(thd, TRUE, ndbcluster_hton);
/*
@@ -4229,7 +4283,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
// Start of transaction
m_rows_changed= 0;
m_ops_pending= 0;
-
+#ifdef HAVE_NDB_BINLOG
+ if (m_share == ndb_apply_status_share && thd->slave_thread)
+ thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
+#endif
// TODO remove double pointers...
m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
m_table_info= &m_thd_ndb_share->stat;
@@ -4372,6 +4429,11 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
"stmt" : "all"));
DBUG_ASSERT(ndb && trans);
+#ifdef HAVE_NDB_BINLOG
+ if (thd->slave_thread)
+ ndbcluster_update_apply_status(thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS);
+#endif /* HAVE_NDB_BINLOG */
+
if (execute_commit(thd,trans) != 0)
{
const NdbError err= trans->getNdbError();
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index 6cc0e423f2f..98f6efe15d8 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -593,6 +593,11 @@ enum THD_NDB_OPTIONS
TNO_NO_LOG_SCHEMA_OP= 1 << 0
};
+enum THD_NDB_TRANS_OPTIONS
+{
+ TNTO_INJECTED_APPLY_STATUS= 1 << 0
+};
+
struct Ndb_local_table_statistics {
int no_uncommitted_rows_count;
ulong last_count;
@@ -620,6 +625,7 @@ class Thd_ndb
NdbTransaction *stmt;
int error;
uint32 options;
+ uint32 trans_options;
List<NDB_SHARE> changed_tables;
uint query_state;
HASH open_tables;
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index 8b51ac68e66..060bc0beff4 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -953,8 +953,8 @@ static void ndbcluster_get_schema(NDB_SHARE *share,
/*
helper function to pack a ndb varchar
*/
-static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
- const char *str, int sz)
+char *ndb_pack_varchar(const NDBCOL *col, char *buf,
+ const char *str, int sz)
{
switch (col->getArrayType())
{
diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h
index 00fc689f061..7864cf3c0aa 100644
--- a/sql/ha_ndbcluster_binlog.h
+++ b/sql/ha_ndbcluster_binlog.h
@@ -181,6 +181,8 @@ int ndbcluster_find_all_files(THD *thd);
void ndb_unpack_record(TABLE *table, NdbValue *value,
MY_BITMAP *defined, byte *buf);
+char *ndb_pack_varchar(const NDBCOL *col, char *buf,
+ const char *str, int sz);
NDB_SHARE *ndbcluster_get_share(const char *key,
TABLE *table,