summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
authorunknown <tomas@whalegate.ndb.mysql.com>2007-10-30 11:28:19 +0100
committerunknown <tomas@whalegate.ndb.mysql.com>2007-10-30 11:28:19 +0100
commitc84d4b213905f37fd0095456dcdd0bfcae51a4f1 (patch)
tree4110aa2d735ea23d41f6683de6895b0ba90e17a7 /sql/ha_ndbcluster.cc
parent7d9c59b7be412210413fc93602ec4522eb1f368b (diff)
parent4ce44da3d5351b29e8dd0ed65bf49faeac21254b (diff)
downloadmariadb-git-c84d4b213905f37fd0095456dcdd0bfcae51a4f1.tar.gz
Merge whalegate.ndb.mysql.com:/home/tomas/cge-5.1
into whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb-merge mysql-test/suite/rpl/r/rpl_bug31076.result: Auto merged sql/ha_ndbcluster.cc: Auto merged sql/handler.cc: Auto merged sql/log_event.cc: Auto merged sql/sql_update.cc: Auto merged storage/ndb/include/ndbapi/Ndb.hpp: Auto merged mysql-test/suite/rpl/t/rpl_bug31076.test: manual merge
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc374
1 files changed, 204 insertions, 170 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index b2152fbb906..436710e3dee 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -325,9 +325,9 @@ Thd_ndb::Thd_ndb()
{
ndb= new Ndb(g_ndb_cluster_connection, "");
lock_count= 0;
+ start_stmt_count= 0;
count= 0;
- all= NULL;
- stmt= NULL;
+ trans= NULL;
m_error= FALSE;
m_error_code= 0;
query_state&= NDB_QUERY_NORMAL;
@@ -382,6 +382,11 @@ Thd_ndb::get_open_table(THD *thd, const void *key)
{
thd_ndb_share= (THD_NDB_SHARE *) alloc_root(&thd->transaction.mem_root,
sizeof(THD_NDB_SHARE));
+ if (!thd_ndb_share)
+ {
+ mem_alloc_error(sizeof(THD_NDB_SHARE));
+ DBUG_RETURN(NULL);
+ }
thd_ndb_share->key= key;
thd_ndb_share->stat.last_count= count;
thd_ndb_share->stat.no_uncommitted_rows_count= 0;
@@ -1638,6 +1643,26 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const uchar *re
DBUG_RETURN(0);
}
+bool ha_ndbcluster::check_index_fields_in_write_set(uint keyno)
+{
+ KEY* key_info= table->key_info + keyno;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+ uint i;
+ DBUG_ENTER("check_index_fields_in_write_set");
+
+ for (i= 0; key_part != end; key_part++, i++)
+ {
+ Field* field= key_part->field;
+ if (!bitmap_is_set(table->write_set, field->field_index))
+ {
+ DBUG_RETURN(false);
+ }
+ }
+
+ DBUG_RETURN(true);
+}
+
int ha_ndbcluster::set_index_key_from_record(NdbOperation *op,
const uchar *record, uint keyno)
{
@@ -1956,8 +1981,8 @@ check_null_in_record(const KEY* key_info, const uchar *record)
* primary key or unique index values
*/
-int ha_ndbcluster::peek_indexed_rows(const uchar *record,
- bool check_pk)
+int ha_ndbcluster::peek_indexed_rows(const uchar *record,
+ NDB_WRITE_OP write_op)
{
NdbTransaction *trans= m_active_trans;
NdbOperation *op;
@@ -1969,7 +1994,7 @@ int ha_ndbcluster::peek_indexed_rows(const uchar *record,
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
first= NULL;
- if (check_pk && table->s->primary_key != MAX_KEY)
+ if (write_op != NDB_UPDATE && table->s->primary_key != MAX_KEY)
{
/*
* Fetch any row with colliding primary key
@@ -2019,6 +2044,11 @@ int ha_ndbcluster::peek_indexed_rows(const uchar *record,
DBUG_PRINT("info", ("skipping check for key with NULL"));
continue;
}
+ if (write_op != NDB_INSERT && !check_index_fields_in_write_set(i))
+ {
+ DBUG_PRINT("info", ("skipping check for key %u not in write_set", i));
+ continue;
+ }
NdbIndexOperation *iop;
const NDBINDEX *unique_index = m_index[i].unique_index;
key_part= key_info->key_part;
@@ -2716,7 +2746,7 @@ int ha_ndbcluster::write_row(uchar *record)
start_bulk_insert will set parameters to ensure that each
write_row is committed individually
*/
- int peek_res= peek_indexed_rows(record, TRUE);
+ int peek_res= peek_indexed_rows(record, NDB_INSERT);
if (!peek_res)
{
@@ -2765,7 +2795,7 @@ int ha_ndbcluster::write_row(uchar *record)
if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1) == -1)
{
if (--retries &&
- ndb->getNdbError().status == NdbError::TemporaryError);
+ ndb->getNdbError().status == NdbError::TemporaryError)
{
my_sleep(retry_sleep);
continue;
@@ -2960,7 +2990,8 @@ int ha_ndbcluster::update_row(const uchar *old_data, uchar *new_data)
if (m_ignore_dup_key && (thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI))
{
- int peek_res= peek_indexed_rows(new_data, pk_update);
+ NDB_WRITE_OP write_op= (pk_update) ? NDB_PK_UPDATE : NDB_UPDATE;
+ int peek_res= peek_indexed_rows(new_data, write_op);
if (!peek_res)
{
@@ -4327,7 +4358,7 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update)
Ndb *ndb= thd_ndb->ndb;
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *ndbtab;
- NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
+ NdbTransaction *trans= thd_ndb->trans;
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
if (!(ndbtab= ndbtab_g.get_table()))
@@ -4371,10 +4402,110 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update)
}
#endif /* HAVE_NDB_BINLOG */
+void ha_ndbcluster::transaction_checks(THD *thd)
+{
+ if (thd->lex->sql_command == SQLCOM_LOAD)
+ {
+ m_transaction_on= FALSE;
+ /* Would be simpler if has_transactions() didn't always say "yes" */
+ thd->transaction.all.modified_non_trans_table=
+ thd->transaction.stmt.modified_non_trans_table= TRUE;
+ }
+ else if (!thd->transaction.on)
+ m_transaction_on= FALSE;
+ else
+ m_transaction_on= thd->variables.ndb_use_transactions;
+}
+
+int ha_ndbcluster::start_statement(THD *thd,
+ Thd_ndb *thd_ndb,
+ Ndb *ndb)
+{
+ DBUG_ENTER("ha_ndbcluster::start_statement");
+ PRINT_OPTION_FLAGS(thd);
+
+ trans_register_ha(thd, FALSE, ndbcluster_hton);
+ if (!thd_ndb->trans)
+ {
+ if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ trans_register_ha(thd, TRUE, ndbcluster_hton);
+ DBUG_PRINT("trans",("Starting transaction"));
+ thd_ndb->trans= ndb->startTransaction();
+ if (thd_ndb->trans == NULL)
+ ERR_RETURN(ndb->getNdbError());
+ thd_ndb->init_open_tables();
+ thd_ndb->query_state&= NDB_QUERY_NORMAL;
+ thd_ndb->trans_options= 0;
+ thd_ndb->m_slow_path= FALSE;
+ if (!(thd->options & OPTION_BIN_LOG) ||
+ thd->variables.binlog_format == BINLOG_FORMAT_STMT)
+ {
+ thd_ndb->trans_options|= TNTO_NO_LOGGING;
+ thd_ndb->m_slow_path= TRUE;
+ }
+ else if (thd->slave_thread)
+ thd_ndb->m_slow_path= TRUE;
+ }
+ /*
+ If this is the start of a LOCK TABLE, a table look
+ should be taken on the table in NDB
+
+ Check if it should be read or write lock
+ */
+ if (thd->options & (OPTION_TABLE_LOCK))
+ {
+ //lockThisTable();
+ DBUG_PRINT("info", ("Locking the table..." ));
+ }
+ DBUG_RETURN(0);
+}
+
+int ha_ndbcluster::init_handler_for_statement(THD *thd, Thd_ndb *thd_ndb)
+{
+ /*
+ This is the place to make sure this handler instance
+ has a started transaction.
+
+ The transaction is started by the first handler on which
+ MySQL Server calls external lock
+
+ Other handlers in the same stmt or transaction should use
+ the same NDB transaction. This is done by setting up the m_active_trans
+ pointer to point to the NDB transaction.
+ */
+
+ DBUG_ENTER("ha_ndbcluster::init_handler_for_statement");
+ // store thread specific data first to set the right context
+ m_force_send= thd->variables.ndb_force_send;
+ m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
+ m_autoincrement_prefetch=
+ (ha_rows) thd->variables.ndb_autoincrement_prefetch_sz;
+
+ m_active_trans= thd_ndb->trans;
+ DBUG_ASSERT(m_active_trans);
+ // Start of transaction
+ m_rows_changed= 0;
+ m_ops_pending= 0;
+ m_slow_path= thd_ndb->m_slow_path;
+#ifdef HAVE_NDB_BINLOG
+ if (unlikely(m_slow_path))
+ {
+ if (m_share == ndb_apply_status_share && thd->slave_thread)
+ thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
+ }
+#endif
+ // TODO remove double pointers...
+ if (!(m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table)))
+ {
+ DBUG_RETURN(1);
+ }
+ m_table_info= &m_thd_ndb_share->stat;
+ DBUG_RETURN(0);
+}
+
int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{
int error=0;
- NdbTransaction* trans= NULL;
DBUG_ENTER("external_lock");
/*
@@ -4395,124 +4526,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (lock_type != F_UNLCK)
{
DBUG_PRINT("info", ("lock_type != F_UNLCK"));
- if (thd->lex->sql_command == SQLCOM_LOAD)
- {
- m_transaction_on= FALSE;
- /* Would be simpler if has_transactions() didn't always say "yes" */
- thd->transaction.all.modified_non_trans_table= thd->transaction.stmt.modified_non_trans_table= TRUE;
- }
- else if (!thd->transaction.on)
- m_transaction_on= FALSE;
- else
- m_transaction_on= thd->variables.ndb_use_transactions;
+ transaction_checks(thd);
if (!thd_ndb->lock_count++)
{
- PRINT_OPTION_FLAGS(thd);
- if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
- {
- // Autocommit transaction
- DBUG_ASSERT(!thd_ndb->stmt);
- DBUG_PRINT("trans",("Starting transaction stmt"));
-
- trans= ndb->startTransaction();
- if (trans == NULL)
- {
- thd_ndb->lock_count= 0;
- ERR_RETURN(ndb->getNdbError());
- }
- thd_ndb->init_open_tables();
- thd_ndb->stmt= trans;
- thd_ndb->query_state&= NDB_QUERY_NORMAL;
- thd_ndb->trans_options= 0;
- thd_ndb->m_slow_path= FALSE;
- if (!(thd->options & OPTION_BIN_LOG) ||
- thd->variables.binlog_format == BINLOG_FORMAT_STMT)
- {
- thd_ndb->trans_options|= TNTO_NO_LOGGING;
- thd_ndb->m_slow_path= TRUE;
- }
- else if (thd->slave_thread)
- thd_ndb->m_slow_path= TRUE;
- trans_register_ha(thd, FALSE, ndbcluster_hton);
- }
- else
- {
- if (!thd_ndb->all)
- {
- // Not autocommit transaction
- // A "master" transaction ha not been started yet
- DBUG_PRINT("trans",("starting transaction, all"));
-
- trans= ndb->startTransaction();
- if (trans == NULL)
- {
- thd_ndb->lock_count= 0;
- ERR_RETURN(ndb->getNdbError());
- }
- thd_ndb->init_open_tables();
- thd_ndb->all= trans;
- thd_ndb->query_state&= NDB_QUERY_NORMAL;
- thd_ndb->trans_options= 0;
- thd_ndb->m_slow_path= FALSE;
- if (!(thd->options & OPTION_BIN_LOG) ||
- thd->variables.binlog_format == BINLOG_FORMAT_STMT)
- {
- thd_ndb->trans_options|= TNTO_NO_LOGGING;
- thd_ndb->m_slow_path= TRUE;
- }
- else if (thd->slave_thread)
- thd_ndb->m_slow_path= TRUE;
- trans_register_ha(thd, TRUE, ndbcluster_hton);
-
- /*
- If this is the start of a LOCK TABLE, a table look
- should be taken on the table in NDB
-
- Check if it should be read or write lock
- */
- if (thd->options & (OPTION_TABLE_LOCK))
- {
- //lockThisTable();
- DBUG_PRINT("info", ("Locking the table..." ));
- }
-
- }
- }
- }
- /*
- This is the place to make sure this handler instance
- has a started transaction.
-
- The transaction is started by the first handler on which
- MySQL Server calls external lock
-
- Other handlers in the same stmt or transaction should use
- the same NDB transaction. This is done by setting up the m_active_trans
- pointer to point to the NDB transaction.
- */
-
- // store thread specific data first to set the right context
- m_force_send= thd->variables.ndb_force_send;
- m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
- m_autoincrement_prefetch=
- (ha_rows) thd->variables.ndb_autoincrement_prefetch_sz;
-
- m_active_trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
- DBUG_ASSERT(m_active_trans);
- // Start of transaction
- m_rows_changed= 0;
- m_ops_pending= 0;
- m_slow_path= thd_ndb->m_slow_path;
-#ifdef HAVE_NDB_BINLOG
- if (unlikely(m_slow_path))
- {
- if (m_share == ndb_apply_status_share && thd->slave_thread)
- thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
+ if ((error= start_statement(thd, thd_ndb, ndb)))
+ goto error;
}
-#endif
- // TODO remove double pointers...
- m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
- m_table_info= &m_thd_ndb_share->stat;
+ if ((error= init_handler_for_statement(thd, thd_ndb)))
+ goto error;
+ DBUG_RETURN(0);
}
else
{
@@ -4540,16 +4562,19 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
DBUG_PRINT("trans", ("Last external_lock"));
PRINT_OPTION_FLAGS(thd);
- if (thd_ndb->stmt)
+ if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
{
- /*
- Unlock is done without a transaction commit / rollback.
- This happens if the thread didn't update any rows
- We must in this case close the transaction to release resources
- */
- DBUG_PRINT("trans",("ending non-updating transaction"));
- ndb->closeTransaction(m_active_trans);
- thd_ndb->stmt= NULL;
+ if (thd_ndb->trans)
+ {
+ /*
+ Unlock is done without a transaction commit / rollback.
+ This happens if the thread didn't update any rows
+ We must in this case close the transaction to release resources
+ */
+ DBUG_PRINT("trans",("ending non-updating transaction"));
+ ndb->closeTransaction(thd_ndb->trans);
+ thd_ndb->trans= NULL;
+ }
}
}
m_table_info= NULL;
@@ -4578,7 +4603,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (m_ops_pending)
DBUG_PRINT("warning", ("ops_pending != 0L"));
m_ops_pending= 0;
+ DBUG_RETURN(0);
}
+error:
+ thd_ndb->lock_count--;
DBUG_RETURN(error);
}
@@ -4610,25 +4638,20 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
{
int error=0;
DBUG_ENTER("start_stmt");
- PRINT_OPTION_FLAGS(thd);
Thd_ndb *thd_ndb= get_thd_ndb(thd);
- NdbTransaction *trans= (thd_ndb->stmt)?thd_ndb->stmt:thd_ndb->all;
- if (!trans){
+ transaction_checks(thd);
+ if (!thd_ndb->start_stmt_count++)
+ {
Ndb *ndb= thd_ndb->ndb;
- DBUG_PRINT("trans",("Starting transaction stmt"));
- trans= ndb->startTransaction();
- if (trans == NULL)
- ERR_RETURN(ndb->getNdbError());
- no_uncommitted_rows_reset(thd);
- thd_ndb->stmt= trans;
- thd_ndb->query_state&= NDB_QUERY_NORMAL;
- trans_register_ha(thd, FALSE, ndbcluster_hton);
+ if ((error= start_statement(thd, thd_ndb, ndb)))
+ goto error;
}
- m_active_trans= trans;
- // Start of statement
- m_ops_pending= 0;
-
+ if ((error= init_handler_for_statement(thd, thd_ndb)))
+ goto error;
+ DBUG_RETURN(0);
+error:
+ thd_ndb->start_stmt_count--;
DBUG_RETURN(error);
}
@@ -4642,15 +4665,29 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
- NdbTransaction *trans= all ? thd_ndb->all : thd_ndb->stmt;
+ NdbTransaction *trans= thd_ndb->trans;
DBUG_ENTER("ndbcluster_commit");
- DBUG_PRINT("transaction",("%s",
- trans == thd_ndb->stmt ?
- "stmt" : "all"));
DBUG_ASSERT(ndb);
- if (trans == NULL)
+ PRINT_OPTION_FLAGS(thd);
+ DBUG_PRINT("enter", ("Commit %s", (all ? "all" : "stmt")));
+ thd_ndb->start_stmt_count= 0;
+ if (trans == NULL || (!all &&
+ thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+ {
+ /*
+ An odditity in the handler interface is that commit on handlerton
+ is called to indicate end of statement only in cases where
+ autocommit isn't used and the all flag isn't set.
+
+ We also leave quickly when a transaction haven't even been started,
+ in this case we are safe that no clean up is needed. In this case
+ the MySQL Server could handle the query without contacting the
+ NDB kernel.
+ */
+ DBUG_PRINT("info", ("Commit before start or end-of-statement only"));
DBUG_RETURN(0);
+ }
#ifdef HAVE_NDB_BINLOG
if (unlikely(thd_ndb->m_slow_path))
@@ -4671,11 +4708,7 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
ndbcluster_print_error(res, error_op);
}
ndb->closeTransaction(trans);
-
- if (all)
- thd_ndb->all= NULL;
- else
- thd_ndb->stmt= NULL;
+ thd_ndb->trans= NULL;
/* Clear commit_count for tables changed by transaction */
NDB_SHARE* share;
@@ -4704,13 +4737,18 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all)
int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
- NdbTransaction *trans= all ? thd_ndb->all : thd_ndb->stmt;
+ NdbTransaction *trans= thd_ndb->trans;
DBUG_ENTER("ndbcluster_rollback");
- DBUG_PRINT("transaction",("%s",
- trans == thd_ndb->stmt ?
- "stmt" : "all"));
- DBUG_ASSERT(ndb && trans);
+ DBUG_ASSERT(ndb);
+ thd_ndb->start_stmt_count= 0;
+ if (trans == NULL || (!all &&
+ thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+ {
+ /* Ignore end-of-statement until real rollback or commit is called */
+ DBUG_PRINT("info", ("Rollback before start or end-of-statement only"));
+ DBUG_RETURN(0);
+ }
if (trans->execute(NdbTransaction::Rollback) != 0)
{
@@ -4722,11 +4760,7 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all)
ndbcluster_print_error(res, error_op);
}
ndb->closeTransaction(trans);
-
- if (all)
- thd_ndb->all= NULL;
- else
- thd_ndb->stmt= NULL;
+ thd_ndb->trans= NULL;
/* Clear list of tables changed by transaction */
thd_ndb->changed_tables.empty();
@@ -6155,7 +6189,7 @@ void ha_ndbcluster::get_auto_increment(ulonglong offset, ulonglong increment,
ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size, increment, offset))
{
if (--retries &&
- ndb->getNdbError().status == NdbError::TemporaryError);
+ ndb->getNdbError().status == NdbError::TemporaryError)
{
my_sleep(retry_sleep);
continue;