diff options
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 64 |
1 files changed, 63 insertions, 1 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 353dd806448..416cd18d924 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -4123,6 +4123,58 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, - refresh list of the indexes for the table if needed (if altered) */ +#ifdef HAVE_NDB_BINLOG +extern MASTER_INFO *active_mi; +static int ndbcluster_update_apply_status(THD *thd, int do_update) +{ + Thd_ndb *thd_ndb= get_thd_ndb(thd); + Ndb *ndb= thd_ndb->ndb; + NDBDICT *dict= ndb->getDictionary(); + const NDBTAB *ndbtab; + NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt; + ndb->setDatabaseName(NDB_REP_DB); + Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE); + if (!(ndbtab= ndbtab_g.get_table())) + { + return -1; + } + NdbOperation *op= 0; + int r= 0; + r|= (op= trans->getNdbOperation(ndbtab)) == 0; + DBUG_ASSERT(r == 0); + if (do_update) + r|= op->updateTuple(); + else + r|= op->writeTuple(); + DBUG_ASSERT(r == 0); + // server_id + r|= op->equal(0u, (Uint64)thd->server_id); + DBUG_ASSERT(r == 0); + if (!do_update) + { + // epoch + r|= op->setValue(1u, (Uint64)0); + DBUG_ASSERT(r == 0); + } + // log_name + char tmp_buf[FN_REFLEN]; + ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf, + active_mi->rli.group_master_log_name, + strlen(active_mi->rli.group_master_log_name)); + r|= op->setValue(2u, tmp_buf); + DBUG_ASSERT(r == 0); + // start_pos + r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos); + DBUG_ASSERT(r == 0); + // end_pos + r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos + + ((Uint64)active_mi->rli.future_event_relay_log_pos - + (Uint64)active_mi->rli.group_relay_log_pos)); + DBUG_ASSERT(r == 0); + return 0; +} +#endif /* HAVE_NDB_BINLOG */ + int ha_ndbcluster::external_lock(THD *thd, int lock_type) { int error=0; @@ -4173,6 +4225,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) thd_ndb->init_open_tables(); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; + thd_ndb->trans_options= 0; trans_register_ha(thd, FALSE, ndbcluster_hton); } else @@ -4189,6 +4242,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) thd_ndb->init_open_tables(); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; + thd_ndb->trans_options= 0; trans_register_ha(thd, TRUE, ndbcluster_hton); /* @@ -4229,7 +4283,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) // Start of transaction m_rows_changed= 0; m_ops_pending= 0; - +#ifdef HAVE_NDB_BINLOG + if (m_share == ndb_apply_status_share && thd->slave_thread) + thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS; +#endif // TODO remove double pointers... m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table); m_table_info= &m_thd_ndb_share->stat; @@ -4372,6 +4429,11 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all) "stmt" : "all")); DBUG_ASSERT(ndb && trans); +#ifdef HAVE_NDB_BINLOG + if (thd->slave_thread) + ndbcluster_update_apply_status(thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS); +#endif /* HAVE_NDB_BINLOG */ + if (execute_commit(thd,trans) != 0) { const NdbError err= trans->getNdbError(); |