summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
authorunknown <magnus@neptunus.(none)>2004-05-10 15:06:07 +0200
committerunknown <magnus@neptunus.(none)>2004-05-10 15:06:07 +0200
commitd6676b0f575b8ec1605fa3cf125b09b073b0a10c (patch)
tree0d71e76929ab093a751afd94dc84e6ef0407fd38 /sql/ha_ndbcluster.cc
parent075eb33889a3193c6364d1edfa11e1d784fd3cb5 (diff)
parente6bec02e3de1aa7e5da4cc8ee5f6467eec47ebcc (diff)
downloadmariadb-git-d6676b0f575b8ec1605fa3cf125b09b073b0a10c.tar.gz
Merged ha_ndbcluster.cc
sql/ha_ndbcluster.h: Auto merged
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc329
1 files changed, 205 insertions, 124 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 9ec7df44a6f..2c474c161d5 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -418,6 +418,13 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN;
}
+
+inline int ha_ndbcluster::get_ndb_lock_type()
+{
+ return (int)((m_lock.type == TL_WRITE_ALLOW_WRITE) ?
+ NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read);
+}
+
static const ulong index_type_flags[]=
{
/* UNDEFINED_INDEX */
@@ -652,22 +659,61 @@ int ha_ndbcluster::unique_index_read(const byte *key,
}
/*
- Get the next record of a started scan
+ Get the next record of a started scan. Try to fetch
+ it locally from NdbApi cached records if possible,
+ otherwise ask NDB for more.
+
+ NOTE
+ If this is a update/delete make sure to not contact
+ NDB before any pending ops have been sent to NDB.
+
*/
inline int ha_ndbcluster::next_result(byte *buf)
{
+ int check;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor= m_active_cursor;
DBUG_ENTER("next_result");
-
- if (cursor->nextResult() == 0)
- {
- // One more record found
- unpack_record(buf);
- table->status= 0;
- DBUG_RETURN(0);
- }
+
+ if (!cursor)
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+ /*
+ If this an update or delete, call nextResult with false
+ to process any records already cached in NdbApi
+ */
+ bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
+ do {
+ DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
+ check= cursor->nextResult(contact_ndb);
+ if (check == 0)
+ {
+ // One more record found
+ DBUG_PRINT("info", ("One more record found"));
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+ }
+ else if (check == 1 || check == 2)
+ {
+ // 1: No more records
+ // 2: No more cached records
+
+ /*
+ Before fetching more rows and releasing lock(s),
+ all pending update or delete operations should
+ be sent to NDB
+ */
+ DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
+ if (ops_pending && trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ ops_pending= 0;
+
+ contact_ndb= (check == 2);
+ }
+ } while (check == 2);
+
table->status= STATUS_NOT_FOUND;
if (ndb_err(trans))
ERR_RETURN(trans->getNdbError());
@@ -739,28 +785,28 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
/*
- Read record(s) from NDB using ordered index scan
+ Start ordered index scan in NDB
*/
int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key,
bool sorted, byte* buf)
{
- uint no_fields= table->fields;
- uint i;
NdbConnection *trans= m_active_trans;
- NdbResultSet *cursor= m_active_cursor;
+ NdbResultSet *cursor;
NdbScanOperation *op;
const char *index_name;
- THD* thd = current_thd;
+
DBUG_ENTER("ordered_index_scan");
- DBUG_PRINT("enter", ("index: %u", active_index));
+ DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
index_name= get_index_name(active_index);
if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(parallelism)))
+ if (!(cursor=
+ op->readTuples(parallelism,
+ (NdbCursorOperation::LockMode)get_ndb_lock_type())))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
@@ -817,22 +863,31 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
- DBUG_RETURN(next_result(buf));
+ DBUG_RETURN(define_read_attrs(buf, op));
}
-#if 0
/*
- Read record(s) from NDB using full table scan with filter
+ Start a filtered scan in NDB.
+
+ NOTE
+ This function is here as an example of how to start a
+ filtered scan. It should be possible to replace full_table_scan
+ with this function and make a best effort attempt
+ at filtering out the irrelevant data by converting the "items"
+ into interpreted instructions.
+ This would speed up table scans where there is a limiting WHERE clause
+ that doesn't match any index in the table.
+
*/
int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
byte *buf,
enum ha_rkey_function find_flag)
{
- uint no_fields= table->fields;
NdbConnection *trans= m_active_trans;
- NdbResultSet *cursor= m_active_cursor;
+ NdbResultSet *cursor;
+ NdbScanOperation *op;
DBUG_ENTER("filtered_scan");
DBUG_PRINT("enter", ("key_len: %u, index: %u",
@@ -840,12 +895,12 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname));
- NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
- if (!op)
+
+ if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
-
- cursor= op->readTuples(parallelism);
- if (!cursor)
+ if (!(cursor=
+ op->readTuples(parallelism,
+ (NdbCursorOperation::LockMode)get_ndb_lock_type())))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
@@ -894,60 +949,44 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
sf.end();
}
- // Define attributes to read
- for (uint field_no= 0; field_no < no_fields; field_no++)
- {
- Field *field= table->field[field_no];
-
- // Read attribute
- DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
- if (get_ndb_value(op, field_no, field->ptr))
- ERR_RETURN(op->getNdbError());
- }
-
- if (table->primary_key == MAX_KEY)
- {
- DBUG_PRINT("info", ("Getting hidden key"));
- // Scanning table with no primary key
- int hidden_no= no_fields;
-#ifndef DBUG_OFF
- const NDBTAB *tab= (NDBTAB *) m_table;
- if (!tab->getColumn(hidden_no))
- DBUG_RETURN(1);
-#endif
- if (get_ndb_value(op, hidden_no, NULL))
- ERR_RETURN(op->getNdbError());
- }
-
- if (trans->execute(NoCommit) != 0)
- DBUG_RETURN(ndb_err(trans));
- DBUG_PRINT("exit", ("Scan started successfully"));
- DBUG_RETURN(next_result(buf));
+ DBUG_RETURN(define_read_attrs(buf, op));
}
-#endif
/*
- Read records from NDB using full table scan
+ Start full table scan in NDB
*/
int ha_ndbcluster::full_table_scan(byte *buf)
{
uint i;
- THD *thd= current_thd;
- NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbScanOperation *op;
+ NdbConnection *trans= m_active_trans;
DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(parallelism)))
+ if (!(cursor=
+ op->readTuples(parallelism,
+ (NdbCursorOperation::LockMode)get_ndb_lock_type())))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
-
+ DBUG_RETURN(define_read_attrs(buf, op));
+}
+
+
+inline
+int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
+{
+ uint i;
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+
+ DBUG_ENTER("define_read_attrs");
+
// Define attributes to read
for (i= 0; i < table->fields; i++)
{
@@ -1042,7 +1081,8 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
*/
rows_inserted++;
- if ((rows_inserted % bulk_insert_rows) == 0)
+ if ((rows_inserted == rows_to_insert) ||
+ ((rows_inserted % bulk_insert_rows) == 0))
{
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
@@ -1097,6 +1137,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{
THD *thd= current_thd;
NdbConnection *trans= m_active_trans;
+ NdbResultSet* cursor= m_active_cursor;
NdbOperation *op;
uint i;
DBUG_ENTER("update_row");
@@ -1105,49 +1146,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (table->timestamp_on_update_now)
update_timestamp(new_data+table->timestamp_on_update_now-1);
- if (!(op= trans->getNdbOperation(m_tabname)) ||
- op->updateTuple() != 0)
- ERR_RETURN(trans->getNdbError());
-
- if (table->primary_key == MAX_KEY)
- {
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
-
- // Require that the PK for this record has previously been
- // read into m_value
- uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
- DBUG_ASSERT(rec);
- DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
-
- if (set_hidden_key(op, no_fields, rec->aRef()))
- ERR_RETURN(op->getNdbError());
- }
- else
+ /* Check for update of primary key and return error */
+ if ((table->primary_key != MAX_KEY) &&
+ (key_cmp(table->primary_key, old_data, new_data)))
+ DBUG_RETURN(HA_ERR_UNSUPPORTED);
+
+ if (cursor)
{
- /* Check for update of primary key and return error */
- if (key_cmp(table->primary_key, old_data, new_data))
- DBUG_RETURN(HA_ERR_UNSUPPORTED);
-
- int res;
- if ((res= set_primary_key(op, old_data + table->null_bytes)))
- DBUG_RETURN(res);
+ /*
+ We are scanning records and want to update the record
+ that was just found, call updateTuple on the cursor
+ to take over the lock to a new update operation
+ And thus setting the primary key of the record from
+ the active record in cursor
+ */
+ DBUG_PRINT("info", ("Calling updateTuple on cursor"));
+ if (!(op= cursor->updateTuple()))
+ ERR_RETURN(trans->getNdbError());
+ ops_pending++;
+ }
+ else
+ {
+ if (!(op= trans->getNdbOperation(m_tabname)) ||
+ op->updateTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+
+ // Require that the PK for this record has previously been
+ // read into m_value
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec);
+ DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op, old_data + table->null_bytes)))
+ DBUG_RETURN(res);
+ }
}
// Set non-key attribute(s)
for (i= 0; i < table->fields; i++)
{
-
Field *field= table->field[i];
if ((thd->query_id == field->query_id) &&
(!(field->flags & PRI_KEY_FLAG)) &&
set_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
-
+
// Execute update operation
- if (trans->execute(NoCommit) != 0)
+ if (!cursor && trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(0);
@@ -1161,39 +1219,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
int ha_ndbcluster::delete_row(const byte *record)
{
NdbConnection *trans= m_active_trans;
+ NdbResultSet* cursor= m_active_cursor;
NdbOperation *op;
DBUG_ENTER("delete_row");
statistic_increment(ha_delete_count,&LOCK_status);
- if (!(op=trans->getNdbOperation(m_tabname)) ||
- op->deleteTuple() != 0)
- ERR_RETURN(trans->getNdbError());
-
- if (table->primary_key == MAX_KEY)
+ if (cursor)
{
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
- uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
- DBUG_ASSERT(rec != NULL);
+ /*
+ We are scanning records and want to update the record
+ that was just found, call deleteTuple on the cursor
+ to take over the lock to a new update operation
+ And thus setting the primary key of the record from
+ the active record in cursor
+ */
+ DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
+ if (cursor->deleteTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+ ops_pending++;
- if (set_hidden_key(op, no_fields, rec->aRef()))
- ERR_RETURN(op->getNdbError());
- }
- else
+ // If deleting from cursor, NoCommit will be handled in next_result
+ DBUG_RETURN(0);
+ }
+ else
{
- int res;
- if ((res= set_primary_key(op)))
- return res;
+
+ if (!(op=trans->getNdbOperation(m_tabname)) ||
+ op->deleteTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec != NULL);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
}
-
+
// Execute delete operation
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(0);
}
-
+
/*
Unpack a record read from NDB
@@ -1481,11 +1561,7 @@ int ha_ndbcluster::index_next(byte *buf)
int error = 1;
statistic_increment(ha_read_next_count,&LOCK_status);
- if (!m_active_cursor)
- error= HA_ERR_END_OF_FILE;
- else
- error = next_result(buf);
- DBUG_RETURN(error);
+ DBUG_RETURN(next_result(buf));
}
@@ -1519,7 +1595,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
{
KEY* key_info;
int error= 1;
- byte* buf = table->record[0];
+ byte* buf= table->record[0];
DBUG_ENTER("ha_ndbcluster::read_range_first");
DBUG_PRINT("info", ("sorted: %d", sorted));
@@ -1548,6 +1624,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
DBUG_RETURN(error);
}
+
int ha_ndbcluster::read_range_next(bool eq_range)
{
DBUG_ENTER("ha_ndbcluster::read_range_next");
@@ -1587,12 +1664,10 @@ int ha_ndbcluster::rnd_next(byte *buf)
{
DBUG_ENTER("rnd_next");
statistic_increment(ha_read_rnd_next_count, &LOCK_status);
- int error = 1;
+
if (!m_active_cursor)
- error = full_table_scan(buf);
- else
- error = next_result(buf);
- DBUG_RETURN(error);
+ DBUG_RETURN(full_table_scan(buf));
+ DBUG_RETURN(next_result(buf));
}
@@ -1920,6 +1995,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
m_lock.type=lock_type;
}
*to++= &m_lock;
+
+ DBUG_PRINT("exit", ("lock_type: %d", lock_type));
DBUG_RETURN(to);
}
@@ -2034,8 +2111,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
(NdbConnection*)thd->transaction.stmt.ndb_tid;
DBUG_ASSERT(m_active_trans);
+ // Start of transaction
retrieve_all_fields= FALSE;
-
+ ops_pending= 0;
}
else
{
@@ -2087,7 +2165,9 @@ int ha_ndbcluster::start_stmt(THD *thd)
}
m_active_trans= trans;
+ // Start of statement
retrieve_all_fields= FALSE;
+ ops_pending= 0;
DBUG_RETURN(error);
}
@@ -2568,7 +2648,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
retrieve_all_fields(FALSE),
rows_to_insert(0),
rows_inserted(0),
- bulk_insert_rows(1024)
+ bulk_insert_rows(1024),
+ ops_pending(0)
{
int i;