diff options
author | unknown <tomas@whalegate.ndb.mysql.com> | 2007-10-30 11:28:19 +0100 |
---|---|---|
committer | unknown <tomas@whalegate.ndb.mysql.com> | 2007-10-30 11:28:19 +0100 |
commit | c84d4b213905f37fd0095456dcdd0bfcae51a4f1 (patch) | |
tree | 4110aa2d735ea23d41f6683de6895b0ba90e17a7 /sql/ha_ndbcluster.cc | |
parent | 7d9c59b7be412210413fc93602ec4522eb1f368b (diff) | |
parent | 4ce44da3d5351b29e8dd0ed65bf49faeac21254b (diff) | |
download | mariadb-git-c84d4b213905f37fd0095456dcdd0bfcae51a4f1.tar.gz |
Merge whalegate.ndb.mysql.com:/home/tomas/cge-5.1
into whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb-merge
mysql-test/suite/rpl/r/rpl_bug31076.result:
Auto merged
sql/ha_ndbcluster.cc:
Auto merged
sql/handler.cc:
Auto merged
sql/log_event.cc:
Auto merged
sql/sql_update.cc:
Auto merged
storage/ndb/include/ndbapi/Ndb.hpp:
Auto merged
mysql-test/suite/rpl/t/rpl_bug31076.test:
manual merge
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 374 |
1 files changed, 204 insertions, 170 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index b2152fbb906..436710e3dee 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -325,9 +325,9 @@ Thd_ndb::Thd_ndb() { ndb= new Ndb(g_ndb_cluster_connection, ""); lock_count= 0; + start_stmt_count= 0; count= 0; - all= NULL; - stmt= NULL; + trans= NULL; m_error= FALSE; m_error_code= 0; query_state&= NDB_QUERY_NORMAL; @@ -382,6 +382,11 @@ Thd_ndb::get_open_table(THD *thd, const void *key) { thd_ndb_share= (THD_NDB_SHARE *) alloc_root(&thd->transaction.mem_root, sizeof(THD_NDB_SHARE)); + if (!thd_ndb_share) + { + mem_alloc_error(sizeof(THD_NDB_SHARE)); + DBUG_RETURN(NULL); + } thd_ndb_share->key= key; thd_ndb_share->stat.last_count= count; thd_ndb_share->stat.no_uncommitted_rows_count= 0; @@ -1638,6 +1643,26 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const uchar *re DBUG_RETURN(0); } +bool ha_ndbcluster::check_index_fields_in_write_set(uint keyno) +{ + KEY* key_info= table->key_info + keyno; + KEY_PART_INFO* key_part= key_info->key_part; + KEY_PART_INFO* end= key_part+key_info->key_parts; + uint i; + DBUG_ENTER("check_index_fields_in_write_set"); + + for (i= 0; key_part != end; key_part++, i++) + { + Field* field= key_part->field; + if (!bitmap_is_set(table->write_set, field->field_index)) + { + DBUG_RETURN(false); + } + } + + DBUG_RETURN(true); +} + int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const uchar *record, uint keyno) { @@ -1956,8 +1981,8 @@ check_null_in_record(const KEY* key_info, const uchar *record) * primary key or unique index values */ -int ha_ndbcluster::peek_indexed_rows(const uchar *record, - bool check_pk) +int ha_ndbcluster::peek_indexed_rows(const uchar *record, + NDB_WRITE_OP write_op) { NdbTransaction *trans= m_active_trans; NdbOperation *op; @@ -1969,7 +1994,7 @@ int ha_ndbcluster::peek_indexed_rows(const uchar *record, NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); first= NULL; - if (check_pk && table->s->primary_key != MAX_KEY) + if (write_op != NDB_UPDATE && table->s->primary_key != MAX_KEY) { /* * Fetch any row with colliding primary key @@ -2019,6 +2044,11 @@ int ha_ndbcluster::peek_indexed_rows(const uchar *record, DBUG_PRINT("info", ("skipping check for key with NULL")); continue; } + if (write_op != NDB_INSERT && !check_index_fields_in_write_set(i)) + { + DBUG_PRINT("info", ("skipping check for key %u not in write_set", i)); + continue; + } NdbIndexOperation *iop; const NDBINDEX *unique_index = m_index[i].unique_index; key_part= key_info->key_part; @@ -2716,7 +2746,7 @@ int ha_ndbcluster::write_row(uchar *record) start_bulk_insert will set parameters to ensure that each write_row is committed individually */ - int peek_res= peek_indexed_rows(record, TRUE); + int peek_res= peek_indexed_rows(record, NDB_INSERT); if (!peek_res) { @@ -2765,7 +2795,7 @@ int ha_ndbcluster::write_row(uchar *record) if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1) == -1) { if (--retries && - ndb->getNdbError().status == NdbError::TemporaryError); + ndb->getNdbError().status == NdbError::TemporaryError) { my_sleep(retry_sleep); continue; @@ -2960,7 +2990,8 @@ int ha_ndbcluster::update_row(const uchar *old_data, uchar *new_data) if (m_ignore_dup_key && (thd->lex->sql_command == SQLCOM_UPDATE || thd->lex->sql_command == SQLCOM_UPDATE_MULTI)) { - int peek_res= peek_indexed_rows(new_data, pk_update); + NDB_WRITE_OP write_op= (pk_update) ? NDB_PK_UPDATE : NDB_UPDATE; + int peek_res= peek_indexed_rows(new_data, write_op); if (!peek_res) { @@ -4327,7 +4358,7 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update) Ndb *ndb= thd_ndb->ndb; NDBDICT *dict= ndb->getDictionary(); const NDBTAB *ndbtab; - NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt; + NdbTransaction *trans= thd_ndb->trans; ndb->setDatabaseName(NDB_REP_DB); Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE); if (!(ndbtab= ndbtab_g.get_table())) @@ -4371,10 +4402,110 @@ static int ndbcluster_update_apply_status(THD *thd, int do_update) } #endif /* HAVE_NDB_BINLOG */ +void ha_ndbcluster::transaction_checks(THD *thd) +{ + if (thd->lex->sql_command == SQLCOM_LOAD) + { + m_transaction_on= FALSE; + /* Would be simpler if has_transactions() didn't always say "yes" */ + thd->transaction.all.modified_non_trans_table= + thd->transaction.stmt.modified_non_trans_table= TRUE; + } + else if (!thd->transaction.on) + m_transaction_on= FALSE; + else + m_transaction_on= thd->variables.ndb_use_transactions; +} + +int ha_ndbcluster::start_statement(THD *thd, + Thd_ndb *thd_ndb, + Ndb *ndb) +{ + DBUG_ENTER("ha_ndbcluster::start_statement"); + PRINT_OPTION_FLAGS(thd); + + trans_register_ha(thd, FALSE, ndbcluster_hton); + if (!thd_ndb->trans) + { + if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trans_register_ha(thd, TRUE, ndbcluster_hton); + DBUG_PRINT("trans",("Starting transaction")); + thd_ndb->trans= ndb->startTransaction(); + if (thd_ndb->trans == NULL) + ERR_RETURN(ndb->getNdbError()); + thd_ndb->init_open_tables(); + thd_ndb->query_state&= NDB_QUERY_NORMAL; + thd_ndb->trans_options= 0; + thd_ndb->m_slow_path= FALSE; + if (!(thd->options & OPTION_BIN_LOG) || + thd->variables.binlog_format == BINLOG_FORMAT_STMT) + { + thd_ndb->trans_options|= TNTO_NO_LOGGING; + thd_ndb->m_slow_path= TRUE; + } + else if (thd->slave_thread) + thd_ndb->m_slow_path= TRUE; + } + /* + If this is the start of a LOCK TABLE, a table look + should be taken on the table in NDB + + Check if it should be read or write lock + */ + if (thd->options & (OPTION_TABLE_LOCK)) + { + //lockThisTable(); + DBUG_PRINT("info", ("Locking the table..." )); + } + DBUG_RETURN(0); +} + +int ha_ndbcluster::init_handler_for_statement(THD *thd, Thd_ndb *thd_ndb) +{ + /* + This is the place to make sure this handler instance + has a started transaction. + + The transaction is started by the first handler on which + MySQL Server calls external lock + + Other handlers in the same stmt or transaction should use + the same NDB transaction. This is done by setting up the m_active_trans + pointer to point to the NDB transaction. + */ + + DBUG_ENTER("ha_ndbcluster::init_handler_for_statement"); + // store thread specific data first to set the right context + m_force_send= thd->variables.ndb_force_send; + m_ha_not_exact_count= !thd->variables.ndb_use_exact_count; + m_autoincrement_prefetch= + (ha_rows) thd->variables.ndb_autoincrement_prefetch_sz; + + m_active_trans= thd_ndb->trans; + DBUG_ASSERT(m_active_trans); + // Start of transaction + m_rows_changed= 0; + m_ops_pending= 0; + m_slow_path= thd_ndb->m_slow_path; +#ifdef HAVE_NDB_BINLOG + if (unlikely(m_slow_path)) + { + if (m_share == ndb_apply_status_share && thd->slave_thread) + thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS; + } +#endif + // TODO remove double pointers... + if (!(m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table))) + { + DBUG_RETURN(1); + } + m_table_info= &m_thd_ndb_share->stat; + DBUG_RETURN(0); +} + int ha_ndbcluster::external_lock(THD *thd, int lock_type) { int error=0; - NdbTransaction* trans= NULL; DBUG_ENTER("external_lock"); /* @@ -4395,124 +4526,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) if (lock_type != F_UNLCK) { DBUG_PRINT("info", ("lock_type != F_UNLCK")); - if (thd->lex->sql_command == SQLCOM_LOAD) - { - m_transaction_on= FALSE; - /* Would be simpler if has_transactions() didn't always say "yes" */ - thd->transaction.all.modified_non_trans_table= thd->transaction.stmt.modified_non_trans_table= TRUE; - } - else if (!thd->transaction.on) - m_transaction_on= FALSE; - else - m_transaction_on= thd->variables.ndb_use_transactions; + transaction_checks(thd); if (!thd_ndb->lock_count++) { - PRINT_OPTION_FLAGS(thd); - if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) - { - // Autocommit transaction - DBUG_ASSERT(!thd_ndb->stmt); - DBUG_PRINT("trans",("Starting transaction stmt")); - - trans= ndb->startTransaction(); - if (trans == NULL) - { - thd_ndb->lock_count= 0; - ERR_RETURN(ndb->getNdbError()); - } - thd_ndb->init_open_tables(); - thd_ndb->stmt= trans; - thd_ndb->query_state&= NDB_QUERY_NORMAL; - thd_ndb->trans_options= 0; - thd_ndb->m_slow_path= FALSE; - if (!(thd->options & OPTION_BIN_LOG) || - thd->variables.binlog_format == BINLOG_FORMAT_STMT) - { - thd_ndb->trans_options|= TNTO_NO_LOGGING; - thd_ndb->m_slow_path= TRUE; - } - else if (thd->slave_thread) - thd_ndb->m_slow_path= TRUE; - trans_register_ha(thd, FALSE, ndbcluster_hton); - } - else - { - if (!thd_ndb->all) - { - // Not autocommit transaction - // A "master" transaction ha not been started yet - DBUG_PRINT("trans",("starting transaction, all")); - - trans= ndb->startTransaction(); - if (trans == NULL) - { - thd_ndb->lock_count= 0; - ERR_RETURN(ndb->getNdbError()); - } - thd_ndb->init_open_tables(); - thd_ndb->all= trans; - thd_ndb->query_state&= NDB_QUERY_NORMAL; - thd_ndb->trans_options= 0; - thd_ndb->m_slow_path= FALSE; - if (!(thd->options & OPTION_BIN_LOG) || - thd->variables.binlog_format == BINLOG_FORMAT_STMT) - { - thd_ndb->trans_options|= TNTO_NO_LOGGING; - thd_ndb->m_slow_path= TRUE; - } - else if (thd->slave_thread) - thd_ndb->m_slow_path= TRUE; - trans_register_ha(thd, TRUE, ndbcluster_hton); - - /* - If this is the start of a LOCK TABLE, a table look - should be taken on the table in NDB - - Check if it should be read or write lock - */ - if (thd->options & (OPTION_TABLE_LOCK)) - { - //lockThisTable(); - DBUG_PRINT("info", ("Locking the table..." )); - } - - } - } - } - /* - This is the place to make sure this handler instance - has a started transaction. - - The transaction is started by the first handler on which - MySQL Server calls external lock - - Other handlers in the same stmt or transaction should use - the same NDB transaction. This is done by setting up the m_active_trans - pointer to point to the NDB transaction. - */ - - // store thread specific data first to set the right context - m_force_send= thd->variables.ndb_force_send; - m_ha_not_exact_count= !thd->variables.ndb_use_exact_count; - m_autoincrement_prefetch= - (ha_rows) thd->variables.ndb_autoincrement_prefetch_sz; - - m_active_trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt; - DBUG_ASSERT(m_active_trans); - // Start of transaction - m_rows_changed= 0; - m_ops_pending= 0; - m_slow_path= thd_ndb->m_slow_path; -#ifdef HAVE_NDB_BINLOG - if (unlikely(m_slow_path)) - { - if (m_share == ndb_apply_status_share && thd->slave_thread) - thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS; + if ((error= start_statement(thd, thd_ndb, ndb))) + goto error; } -#endif - // TODO remove double pointers... - m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table); - m_table_info= &m_thd_ndb_share->stat; + if ((error= init_handler_for_statement(thd, thd_ndb))) + goto error; + DBUG_RETURN(0); } else { @@ -4540,16 +4562,19 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) DBUG_PRINT("trans", ("Last external_lock")); PRINT_OPTION_FLAGS(thd); - if (thd_ndb->stmt) + if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) { - /* - Unlock is done without a transaction commit / rollback. - This happens if the thread didn't update any rows - We must in this case close the transaction to release resources - */ - DBUG_PRINT("trans",("ending non-updating transaction")); - ndb->closeTransaction(m_active_trans); - thd_ndb->stmt= NULL; + if (thd_ndb->trans) + { + /* + Unlock is done without a transaction commit / rollback. + This happens if the thread didn't update any rows + We must in this case close the transaction to release resources + */ + DBUG_PRINT("trans",("ending non-updating transaction")); + ndb->closeTransaction(thd_ndb->trans); + thd_ndb->trans= NULL; + } } } m_table_info= NULL; @@ -4578,7 +4603,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) if (m_ops_pending) DBUG_PRINT("warning", ("ops_pending != 0L")); m_ops_pending= 0; + DBUG_RETURN(0); } +error: + thd_ndb->lock_count--; DBUG_RETURN(error); } @@ -4610,25 +4638,20 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) { int error=0; DBUG_ENTER("start_stmt"); - PRINT_OPTION_FLAGS(thd); Thd_ndb *thd_ndb= get_thd_ndb(thd); - NdbTransaction *trans= (thd_ndb->stmt)?thd_ndb->stmt:thd_ndb->all; - if (!trans){ + transaction_checks(thd); + if (!thd_ndb->start_stmt_count++) + { Ndb *ndb= thd_ndb->ndb; - DBUG_PRINT("trans",("Starting transaction stmt")); - trans= ndb->startTransaction(); - if (trans == NULL) - ERR_RETURN(ndb->getNdbError()); - no_uncommitted_rows_reset(thd); - thd_ndb->stmt= trans; - thd_ndb->query_state&= NDB_QUERY_NORMAL; - trans_register_ha(thd, FALSE, ndbcluster_hton); + if ((error= start_statement(thd, thd_ndb, ndb))) + goto error; } - m_active_trans= trans; - // Start of statement - m_ops_pending= 0; - + if ((error= init_handler_for_statement(thd, thd_ndb))) + goto error; + DBUG_RETURN(0); +error: + thd_ndb->start_stmt_count--; DBUG_RETURN(error); } @@ -4642,15 +4665,29 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all) int res= 0; Thd_ndb *thd_ndb= get_thd_ndb(thd); Ndb *ndb= thd_ndb->ndb; - NdbTransaction *trans= all ? thd_ndb->all : thd_ndb->stmt; + NdbTransaction *trans= thd_ndb->trans; DBUG_ENTER("ndbcluster_commit"); - DBUG_PRINT("transaction",("%s", - trans == thd_ndb->stmt ? - "stmt" : "all")); DBUG_ASSERT(ndb); - if (trans == NULL) + PRINT_OPTION_FLAGS(thd); + DBUG_PRINT("enter", ("Commit %s", (all ? "all" : "stmt"))); + thd_ndb->start_stmt_count= 0; + if (trans == NULL || (!all && + thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) + { + /* + An odditity in the handler interface is that commit on handlerton + is called to indicate end of statement only in cases where + autocommit isn't used and the all flag isn't set. + + We also leave quickly when a transaction haven't even been started, + in this case we are safe that no clean up is needed. In this case + the MySQL Server could handle the query without contacting the + NDB kernel. + */ + DBUG_PRINT("info", ("Commit before start or end-of-statement only")); DBUG_RETURN(0); + } #ifdef HAVE_NDB_BINLOG if (unlikely(thd_ndb->m_slow_path)) @@ -4671,11 +4708,7 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all) ndbcluster_print_error(res, error_op); } ndb->closeTransaction(trans); - - if (all) - thd_ndb->all= NULL; - else - thd_ndb->stmt= NULL; + thd_ndb->trans= NULL; /* Clear commit_count for tables changed by transaction */ NDB_SHARE* share; @@ -4704,13 +4737,18 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all) int res= 0; Thd_ndb *thd_ndb= get_thd_ndb(thd); Ndb *ndb= thd_ndb->ndb; - NdbTransaction *trans= all ? thd_ndb->all : thd_ndb->stmt; + NdbTransaction *trans= thd_ndb->trans; DBUG_ENTER("ndbcluster_rollback"); - DBUG_PRINT("transaction",("%s", - trans == thd_ndb->stmt ? - "stmt" : "all")); - DBUG_ASSERT(ndb && trans); + DBUG_ASSERT(ndb); + thd_ndb->start_stmt_count= 0; + if (trans == NULL || (!all && + thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) + { + /* Ignore end-of-statement until real rollback or commit is called */ + DBUG_PRINT("info", ("Rollback before start or end-of-statement only")); + DBUG_RETURN(0); + } if (trans->execute(NdbTransaction::Rollback) != 0) { @@ -4722,11 +4760,7 @@ static int ndbcluster_rollback(handlerton *hton, THD *thd, bool all) ndbcluster_print_error(res, error_op); } ndb->closeTransaction(trans); - - if (all) - thd_ndb->all= NULL; - else - thd_ndb->stmt= NULL; + thd_ndb->trans= NULL; /* Clear list of tables changed by transaction */ thd_ndb->changed_tables.empty(); @@ -6155,7 +6189,7 @@ void ha_ndbcluster::get_auto_increment(ulonglong offset, ulonglong increment, ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size, increment, offset)) { if (--retries && - ndb->getNdbError().status == NdbError::TemporaryError); + ndb->getNdbError().status == NdbError::TemporaryError) { my_sleep(retry_sleep); continue; |