summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc64
1 files changed, 63 insertions, 1 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 353dd806448..416cd18d924 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -4123,6 +4123,58 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
- refresh list of the indexes for the table if needed (if altered)
*/
+#ifdef HAVE_NDB_BINLOG
+extern MASTER_INFO *active_mi;
+static int ndbcluster_update_apply_status(THD *thd, int do_update)
+{
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ Ndb *ndb= thd_ndb->ndb;
+ NDBDICT *dict= ndb->getDictionary();
+ const NDBTAB *ndbtab;
+ NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
+ ndb->setDatabaseName(NDB_REP_DB);
+ Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
+ if (!(ndbtab= ndbtab_g.get_table()))
+ {
+ return -1;
+ }
+ NdbOperation *op= 0;
+ int r= 0;
+ r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+ DBUG_ASSERT(r == 0);
+ if (do_update)
+ r|= op->updateTuple();
+ else
+ r|= op->writeTuple();
+ DBUG_ASSERT(r == 0);
+ // server_id
+ r|= op->equal(0u, (Uint64)thd->server_id);
+ DBUG_ASSERT(r == 0);
+ if (!do_update)
+ {
+ // epoch
+ r|= op->setValue(1u, (Uint64)0);
+ DBUG_ASSERT(r == 0);
+ }
+ // log_name
+ char tmp_buf[FN_REFLEN];
+ ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf,
+ active_mi->rli.group_master_log_name,
+ strlen(active_mi->rli.group_master_log_name));
+ r|= op->setValue(2u, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ // start_pos
+ r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos);
+ DBUG_ASSERT(r == 0);
+ // end_pos
+ r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos +
+ ((Uint64)active_mi->rli.future_event_relay_log_pos -
+ (Uint64)active_mi->rli.group_relay_log_pos));
+ DBUG_ASSERT(r == 0);
+ return 0;
+}
+#endif /* HAVE_NDB_BINLOG */
+
int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{
int error=0;
@@ -4173,6 +4225,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables();
thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
+ thd_ndb->trans_options= 0;
trans_register_ha(thd, FALSE, ndbcluster_hton);
}
else
@@ -4189,6 +4242,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables();
thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
+ thd_ndb->trans_options= 0;
trans_register_ha(thd, TRUE, ndbcluster_hton);
/*
@@ -4229,7 +4283,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
// Start of transaction
m_rows_changed= 0;
m_ops_pending= 0;
-
+#ifdef HAVE_NDB_BINLOG
+ if (m_share == ndb_apply_status_share && thd->slave_thread)
+ thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
+#endif
// TODO remove double pointers...
m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
m_table_info= &m_thd_ndb_share->stat;
@@ -4372,6 +4429,11 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
"stmt" : "all"));
DBUG_ASSERT(ndb && trans);
+#ifdef HAVE_NDB_BINLOG
+ if (thd->slave_thread)
+ ndbcluster_update_apply_status(thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS);
+#endif /* HAVE_NDB_BINLOG */
+
if (execute_commit(thd,trans) != 0)
{
const NdbError err= trans->getNdbError();