summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc107
1 files changed, 87 insertions, 20 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 1af677fa754..9c4a2c20ca0 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -35,6 +35,7 @@
// options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring;
+extern ulong opt_ndb_cache_check_time;
// Default value for parallelism
static const int parallelism= 0;
@@ -228,13 +229,15 @@ static int ndb_to_mysql_error(const NdbError *err)
inline
-int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
+int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans,
+ bool force_release)
{
#ifdef NOT_USED
int m_batch_execute= 0;
if (m_batch_execute)
return 0;
#endif
+ h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError,
h->m_force_send);
@@ -267,13 +270,15 @@ int execute_commit(THD *thd, NdbTransaction *trans)
}
inline
-int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans)
+int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans,
+ bool force_release)
{
#ifdef NOT_USED
int m_batch_execute= 0;
if (m_batch_execute)
return 0;
#endif
+ h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AO_IgnoreError,
h->m_force_send);
@@ -290,6 +295,7 @@ Thd_ndb::Thd_ndb()
all= NULL;
stmt= NULL;
error= 0;
+ query_state&= NDB_QUERY_NORMAL;
}
Thd_ndb::~Thd_ndb()
@@ -1443,7 +1449,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
- if (execute_no_commit_ie(this,trans) != 0)
+ if (execute_no_commit_ie(this,trans,false) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -1490,7 +1496,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
ERR_RETURN(trans->getNdbError());
}
}
- if (execute_no_commit(this,trans) != 0)
+ if (execute_no_commit(this,trans,false) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -1630,7 +1636,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record)
}
last= trans->getLastDefinedOperation();
if (first)
- res= execute_no_commit_ie(this,trans);
+ res= execute_no_commit_ie(this,trans,false);
else
{
// Table has no keys
@@ -1679,7 +1685,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
- if (execute_no_commit_ie(this,trans) != 0)
+ if (execute_no_commit_ie(this,trans,false) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -1727,7 +1733,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
*/
if (m_ops_pending && m_blobs_pending)
{
- if (execute_no_commit(this,trans) != 0)
+ if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans));
m_ops_pending= 0;
m_blobs_pending= FALSE;
@@ -1759,7 +1765,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
{
if (m_transaction_on)
{
- if (execute_no_commit(this,trans) != 0)
+ if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(-1);
}
else
@@ -2063,7 +2069,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_RETURN(res);
}
- if (execute_no_commit(this,trans) != 0)
+ if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf));
@@ -2096,7 +2102,7 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
- if (execute_no_commit(this,trans) != 0)
+ if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
@@ -2228,7 +2234,7 @@ int ha_ndbcluster::write_row(byte *record)
m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on)
{
- if (execute_no_commit(this,trans) != 0)
+ if (execute_no_commit(this,trans,false) != 0)
{
m_skip_auto_increment= TRUE;
no_uncommitted_rows_execute_failure();
@@ -2428,7 +2434,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
// Execute update operation
- if (!cursor && execute_no_commit(this,trans) != 0) {
+ if (!cursor && execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
@@ -2499,7 +2505,7 @@ int ha_ndbcluster::delete_row(const byte *record)
}
// Execute delete operation
- if (execute_no_commit(this,trans) != 0) {
+ if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
@@ -2928,6 +2934,26 @@ int ha_ndbcluster::close_scan()
NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor;
+ if (m_lock_tuple)
+ {
+ /*
+ Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
+ (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
+ LOCK WITH SHARE MODE) and row was not explictly unlocked
+ with unlock_row() call
+ */
+ NdbOperation *op;
+ // Lock row
+ DBUG_PRINT("info", ("Keeping lock on scanned row"));
+
+ if (!(op= cursor->lockCurrentTuple()))
+ {
+ m_lock_tuple= false;
+ ERR_RETURN(trans->getNdbError());
+ }
+ m_ops_pending++;
+ }
+ m_lock_tuple= false;
if (m_ops_pending)
{
/*
@@ -2935,7 +2961,7 @@ int ha_ndbcluster::close_scan()
deleteing/updating transaction before closing the scan
*/
DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));
- if (execute_no_commit(this,trans) != 0) {
+ if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
@@ -3345,7 +3371,7 @@ int ha_ndbcluster::end_bulk_insert()
m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on)
{
- if (execute_no_commit(this, trans) != 0)
+ if (execute_no_commit(this, trans,false) != 0)
{
no_uncommitted_rows_execute_failure();
my_errno= error= ndb_err(trans);
@@ -3500,7 +3526,14 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (lock_type != F_UNLCK)
{
DBUG_PRINT("info", ("lock_type != F_UNLCK"));
- if (!thd->transaction.on)
+ if (thd->lex->sql_command == SQLCOM_LOAD)
+ {
+ m_transaction_on= FALSE;
+ /* Would be simpler if has_transactions() didn't always say "yes" */
+ thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
+ thd->no_trans_update= TRUE;
+ }
+ else if (!thd->transaction.on)
m_transaction_on= FALSE;
else
m_transaction_on= thd->variables.ndb_use_transactions;
@@ -3518,6 +3551,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError());
no_uncommitted_rows_reset(thd);
thd_ndb->stmt= trans;
+ thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, FALSE, &ndbcluster_hton);
}
else
@@ -3533,6 +3567,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError());
no_uncommitted_rows_reset(thd);
thd_ndb->all= trans;
+ thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, TRUE, &ndbcluster_hton);
/*
@@ -3739,6 +3774,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
thd_ndb->stmt= trans;
trans_register_ha(thd, FALSE, &ndbcluster_hton);
}
+ thd_ndb->query_state&= NDB_QUERY_NORMAL;
m_active_trans= trans;
// Start of statement
@@ -4147,10 +4183,15 @@ static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
acc_row_size+= 4 + /*safety margin*/ 4;
#endif
ulonglong acc_fragment_size= 512*1024*1024;
+ /*
+ * if not --with-big-tables then max_rows is ulong
+ * the warning in this case is misleading though
+ */
+ ulonglong big_max_rows = (ulonglong)max_rows;
#if MYSQL_VERSION_ID >= 50100
- no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1;
+ no_fragments= (big_max_rows*acc_row_size)/acc_fragment_size+1;
#else
- no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1
+ no_fragments= ((big_max_rows*acc_row_size)/acc_fragment_size+1
+1/*correct rounding*/)/2;
#endif
}
@@ -5210,6 +5251,7 @@ bool ndbcluster_init()
pthread_cond_init(&COND_ndb_util_thread, NULL);
+ ndb_cache_check_time = opt_ndb_cache_check_time;
// Create utility thread
pthread_t tmp;
if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0))
@@ -5986,6 +6028,30 @@ int ha_ndbcluster::write_ndb_file()
DBUG_RETURN(error);
}
+void
+ha_ndbcluster::release_completed_operations(NdbTransaction *trans,
+ bool force_release)
+{
+ if (trans->hasBlobOperation())
+ {
+ /* We are reading/writing BLOB fields,
+ releasing operation records is unsafe
+ */
+ return;
+ }
+ if (!force_release)
+ {
+ if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE)
+ {
+ /* We are batching reads and have not consumed all fetched
+ rows yet, releasing operation records is unsafe
+ */
+ return;
+ }
+ }
+ trans->releaseCompletedOperations();
+}
+
int
ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
KEY_MULTI_RANGE *ranges,
@@ -6000,6 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
NDB_INDEX_TYPE index_type= get_index_type(active_index);
ulong reclength= table->s->reclength;
NdbOperation* op;
+ Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
if (uses_blob_value(m_retrieve_all_fields))
{
@@ -6013,7 +6080,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
sorted,
buffer));
}
-
+ thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE;
m_disable_multi_read= FALSE;
/**
@@ -6160,7 +6227,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
*/
m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
- if (!(res= execute_no_commit_ie(this, m_active_trans)))
+ if (!(res= execute_no_commit_ie(this, m_active_trans, true)))
{
m_multi_range_defined= multi_range_curr;
multi_range_curr= ranges;