diff options
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 107 |
1 files changed, 87 insertions, 20 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 1af677fa754..9c4a2c20ca0 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -35,6 +35,7 @@ // options from from mysqld.cc extern my_bool opt_ndb_optimized_node_selection; extern const char *opt_ndbcluster_connectstring; +extern ulong opt_ndb_cache_check_time; // Default value for parallelism static const int parallelism= 0; @@ -228,13 +229,15 @@ static int ndb_to_mysql_error(const NdbError *err) inline -int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) +int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, + bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif + h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, h->m_force_send); @@ -267,13 +270,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) } inline -int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) +int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, + bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif + h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, h->m_force_send); @@ -290,6 +295,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; + query_state&= NDB_QUERY_NORMAL; } Thd_ndb::~Thd_ndb() @@ -1443,7 +1449,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); - if (execute_no_commit_ie(this,trans) != 0) + if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -1490,7 +1496,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ERR_RETURN(trans->getNdbError()); } } - if (execute_no_commit(this,trans) != 0) + if (execute_no_commit(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -1630,7 +1636,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) } last= trans->getLastDefinedOperation(); if (first) - res= execute_no_commit_ie(this,trans); + res= execute_no_commit_ie(this,trans,false); else { // Table has no keys @@ -1679,7 +1685,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); - if (execute_no_commit_ie(this,trans) != 0) + if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -1727,7 +1733,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) */ if (m_ops_pending && m_blobs_pending) { - if (execute_no_commit(this,trans) != 0) + if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); m_ops_pending= 0; m_blobs_pending= FALSE; @@ -1759,7 +1765,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) { if (m_transaction_on) { - if (execute_no_commit(this,trans) != 0) + if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(-1); } else @@ -2063,7 +2069,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_RETURN(res); } - if (execute_no_commit(this,trans) != 0) + if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(next_result(buf)); @@ -2096,7 +2102,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); - if (execute_no_commit(this,trans) != 0) + if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); DBUG_RETURN(next_result(buf)); @@ -2228,7 +2234,7 @@ int ha_ndbcluster::write_row(byte *record) m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { - if (execute_no_commit(this,trans) != 0) + if (execute_no_commit(this,trans,false) != 0) { m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); @@ -2428,7 +2434,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Execute update operation - if (!cursor && execute_no_commit(this,trans) != 0) { + if (!cursor && execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } @@ -2499,7 +2505,7 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation - if (execute_no_commit(this,trans) != 0) { + if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } @@ -2928,6 +2934,26 @@ int ha_ndbcluster::close_scan() NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor; + if (m_lock_tuple) + { + /* + Lock level m_lock.type either TL_WRITE_ALLOW_WRITE + (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT + LOCK WITH SHARE MODE) and row was not explictly unlocked + with unlock_row() call + */ + NdbOperation *op; + // Lock row + DBUG_PRINT("info", ("Keeping lock on scanned row")); + + if (!(op= cursor->lockCurrentTuple())) + { + m_lock_tuple= false; + ERR_RETURN(trans->getNdbError()); + } + m_ops_pending++; + } + m_lock_tuple= false; if (m_ops_pending) { /* @@ -2935,7 +2961,7 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); - if (execute_no_commit(this,trans) != 0) { + if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } @@ -3345,7 +3371,7 @@ int ha_ndbcluster::end_bulk_insert() m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { - if (execute_no_commit(this, trans) != 0) + if (execute_no_commit(this, trans,false) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); @@ -3500,7 +3526,14 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) if (lock_type != F_UNLCK) { DBUG_PRINT("info", ("lock_type != F_UNLCK")); - if (!thd->transaction.on) + if (thd->lex->sql_command == SQLCOM_LOAD) + { + m_transaction_on= FALSE; + /* Would be simpler if has_transactions() didn't always say "yes" */ + thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; + thd->no_trans_update= TRUE; + } + else if (!thd->transaction.on) m_transaction_on= FALSE; else m_transaction_on= thd->variables.ndb_use_transactions; @@ -3518,6 +3551,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; + thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, &ndbcluster_hton); } else @@ -3533,6 +3567,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->all= trans; + thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, TRUE, &ndbcluster_hton); /* @@ -3739,6 +3774,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); } + thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement @@ -4147,10 +4183,15 @@ static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length) acc_row_size+= 4 + /*safety margin*/ 4; #endif ulonglong acc_fragment_size= 512*1024*1024; + /* + * if not --with-big-tables then max_rows is ulong + * the warning in this case is misleading though + */ + ulonglong big_max_rows = (ulonglong)max_rows; #if MYSQL_VERSION_ID >= 50100 - no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1; + no_fragments= (big_max_rows*acc_row_size)/acc_fragment_size+1; #else - no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1 + no_fragments= ((big_max_rows*acc_row_size)/acc_fragment_size+1 +1/*correct rounding*/)/2; #endif } @@ -5210,6 +5251,7 @@ bool ndbcluster_init() pthread_cond_init(&COND_ndb_util_thread, NULL); + ndb_cache_check_time = opt_ndb_cache_check_time; // Create utility thread pthread_t tmp; if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0)) @@ -5986,6 +6028,30 @@ int ha_ndbcluster::write_ndb_file() DBUG_RETURN(error); } +void +ha_ndbcluster::release_completed_operations(NdbTransaction *trans, + bool force_release) +{ + if (trans->hasBlobOperation()) + { + /* We are reading/writing BLOB fields, + releasing operation records is unsafe + */ + return; + } + if (!force_release) + { + if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE) + { + /* We are batching reads and have not consumed all fetched + rows yet, releasing operation records is unsafe + */ + return; + } + } + trans->releaseCompletedOperations(); +} + int ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, KEY_MULTI_RANGE *ranges, @@ -6000,6 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, NDB_INDEX_TYPE index_type= get_index_type(active_index); ulong reclength= table->s->reclength; NdbOperation* op; + Thd_ndb *thd_ndb= get_thd_ndb(current_thd); if (uses_blob_value(m_retrieve_all_fields)) { @@ -6013,7 +6080,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, sorted, buffer)); } - + thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE; m_disable_multi_read= FALSE; /** @@ -6160,7 +6227,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, */ m_current_multi_operation= lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); - if (!(res= execute_no_commit_ie(this, m_active_trans))) + if (!(res= execute_no_commit_ie(this, m_active_trans, true))) { m_multi_range_defined= multi_range_curr; multi_range_curr= ranges; |