diff options
author | unknown <magnus@neptunus.(none)> | 2004-05-10 15:06:07 +0200 |
---|---|---|
committer | unknown <magnus@neptunus.(none)> | 2004-05-10 15:06:07 +0200 |
commit | d6676b0f575b8ec1605fa3cf125b09b073b0a10c (patch) | |
tree | 0d71e76929ab093a751afd94dc84e6ef0407fd38 /sql/ha_ndbcluster.cc | |
parent | 075eb33889a3193c6364d1edfa11e1d784fd3cb5 (diff) | |
parent | e6bec02e3de1aa7e5da4cc8ee5f6467eec47ebcc (diff) | |
download | mariadb-git-d6676b0f575b8ec1605fa3cf125b09b073b0a10c.tar.gz |
Merged ha_ndbcluster.cc
sql/ha_ndbcluster.h:
Auto merged
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 329 |
1 files changed, 205 insertions, 124 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 9ec7df44a6f..2c474c161d5 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -418,6 +418,13 @@ void ha_ndbcluster::release_metadata() DBUG_VOID_RETURN; } + +inline int ha_ndbcluster::get_ndb_lock_type() +{ + return (int)((m_lock.type == TL_WRITE_ALLOW_WRITE) ? + NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read); +} + static const ulong index_type_flags[]= { /* UNDEFINED_INDEX */ @@ -652,22 +659,61 @@ int ha_ndbcluster::unique_index_read(const byte *key, } /* - Get the next record of a started scan + Get the next record of a started scan. Try to fetch + it locally from NdbApi cached records if possible, + otherwise ask NDB for more. + + NOTE + If this is a update/delete make sure to not contact + NDB before any pending ops have been sent to NDB. + */ inline int ha_ndbcluster::next_result(byte *buf) { + int check; NdbConnection *trans= m_active_trans; NdbResultSet *cursor= m_active_cursor; DBUG_ENTER("next_result"); - - if (cursor->nextResult() == 0) - { - // One more record found - unpack_record(buf); - table->status= 0; - DBUG_RETURN(0); - } + + if (!cursor) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + /* + If this an update or delete, call nextResult with false + to process any records already cached in NdbApi + */ + bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE; + do { + DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); + check= cursor->nextResult(contact_ndb); + if (check == 0) + { + // One more record found + DBUG_PRINT("info", ("One more record found")); + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); + } + else if (check == 1 || check == 2) + { + // 1: No more records + // 2: No more cached records + + /* + Before fetching more rows and releasing lock(s), + all pending update or delete operations should + be sent to NDB + */ + DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); + if (ops_pending && trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + ops_pending= 0; + + contact_ndb= (check == 2); + } + } while (check == 2); + table->status= STATUS_NOT_FOUND; if (ndb_err(trans)) ERR_RETURN(trans->getNdbError()); @@ -739,28 +785,28 @@ int ha_ndbcluster::set_bounds(NdbOperation *op, /* - Read record(s) from NDB using ordered index scan + Start ordered index scan in NDB */ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, const key_range *end_key, bool sorted, byte* buf) { - uint no_fields= table->fields; - uint i; NdbConnection *trans= m_active_trans; - NdbResultSet *cursor= m_active_cursor; + NdbResultSet *cursor; NdbScanOperation *op; const char *index_name; - THD* thd = current_thd; + DBUG_ENTER("ordered_index_scan"); - DBUG_PRINT("enter", ("index: %u", active_index)); + DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted)); DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); index_name= get_index_name(active_index); if (!(op= trans->getNdbScanOperation(index_name, m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(parallelism))) + if (!(cursor= + op->readTuples(parallelism, + (NdbCursorOperation::LockMode)get_ndb_lock_type()))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -817,22 +863,31 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, if (trans->execute(NoCommit) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); - DBUG_RETURN(next_result(buf)); + DBUG_RETURN(define_read_attrs(buf, op)); } -#if 0 /* - Read record(s) from NDB using full table scan with filter + Start a filtered scan in NDB. + + NOTE + This function is here as an example of how to start a + filtered scan. It should be possible to replace full_table_scan + with this function and make a best effort attempt + at filtering out the irrelevant data by converting the "items" + into interpreted instructions. + This would speed up table scans where there is a limiting WHERE clause + that doesn't match any index in the table. + */ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, byte *buf, enum ha_rkey_function find_flag) { - uint no_fields= table->fields; NdbConnection *trans= m_active_trans; - NdbResultSet *cursor= m_active_cursor; + NdbResultSet *cursor; + NdbScanOperation *op; DBUG_ENTER("filtered_scan"); DBUG_PRINT("enter", ("key_len: %u, index: %u", @@ -840,12 +895,12 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, DBUG_DUMP("key", (char*)key, key_len); DBUG_PRINT("info", ("Starting a new filtered scan on %s", m_tabname)); - NdbScanOperation *op= trans->getNdbScanOperation(m_tabname); - if (!op) + + if (!(op= trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - - cursor= op->readTuples(parallelism); - if (!cursor) + if (!(cursor= + op->readTuples(parallelism, + (NdbCursorOperation::LockMode)get_ndb_lock_type()))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -894,60 +949,44 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, sf.end(); } - // Define attributes to read - for (uint field_no= 0; field_no < no_fields; field_no++) - { - Field *field= table->field[field_no]; - - // Read attribute - DBUG_PRINT("get", ("%d: %s", field_no, field->field_name)); - if (get_ndb_value(op, field_no, field->ptr)) - ERR_RETURN(op->getNdbError()); - } - - if (table->primary_key == MAX_KEY) - { - DBUG_PRINT("info", ("Getting hidden key")); - // Scanning table with no primary key - int hidden_no= no_fields; -#ifndef DBUG_OFF - const NDBTAB *tab= (NDBTAB *) m_table; - if (!tab->getColumn(hidden_no)) - DBUG_RETURN(1); -#endif - if (get_ndb_value(op, hidden_no, NULL)) - ERR_RETURN(op->getNdbError()); - } - - if (trans->execute(NoCommit) != 0) - DBUG_RETURN(ndb_err(trans)); - DBUG_PRINT("exit", ("Scan started successfully")); - DBUG_RETURN(next_result(buf)); + DBUG_RETURN(define_read_attrs(buf, op)); } -#endif /* - Read records from NDB using full table scan + Start full table scan in NDB */ int ha_ndbcluster::full_table_scan(byte *buf) { uint i; - THD *thd= current_thd; - NdbConnection *trans= m_active_trans; NdbResultSet *cursor; NdbScanOperation *op; + NdbConnection *trans= m_active_trans; DBUG_ENTER("full_table_scan"); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); if (!(op=trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(parallelism))) + if (!(cursor= + op->readTuples(parallelism, + (NdbCursorOperation::LockMode)get_ndb_lock_type()))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; - + DBUG_RETURN(define_read_attrs(buf, op)); +} + + +inline +int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) +{ + uint i; + THD *thd= current_thd; + NdbConnection *trans= m_active_trans; + + DBUG_ENTER("define_read_attrs"); + // Define attributes to read for (i= 0; i < table->fields; i++) { @@ -1042,7 +1081,8 @@ int ha_ndbcluster::write_row(byte *record) Find out how this is detected! */ rows_inserted++; - if ((rows_inserted % bulk_insert_rows) == 0) + if ((rows_inserted == rows_to_insert) || + ((rows_inserted % bulk_insert_rows) == 0)) { // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ @@ -1097,6 +1137,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) { THD *thd= current_thd; NdbConnection *trans= m_active_trans; + NdbResultSet* cursor= m_active_cursor; NdbOperation *op; uint i; DBUG_ENTER("update_row"); @@ -1105,49 +1146,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) if (table->timestamp_on_update_now) update_timestamp(new_data+table->timestamp_on_update_now-1); - if (!(op= trans->getNdbOperation(m_tabname)) || - op->updateTuple() != 0) - ERR_RETURN(trans->getNdbError()); - - if (table->primary_key == MAX_KEY) - { - // This table has no primary key, use "hidden" primary key - DBUG_PRINT("info", ("Using hidden key")); - - // Require that the PK for this record has previously been - // read into m_value - uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; - DBUG_ASSERT(rec); - DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); - - if (set_hidden_key(op, no_fields, rec->aRef())) - ERR_RETURN(op->getNdbError()); - } - else + /* Check for update of primary key and return error */ + if ((table->primary_key != MAX_KEY) && + (key_cmp(table->primary_key, old_data, new_data))) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + + if (cursor) { - /* Check for update of primary key and return error */ - if (key_cmp(table->primary_key, old_data, new_data)) - DBUG_RETURN(HA_ERR_UNSUPPORTED); - - int res; - if ((res= set_primary_key(op, old_data + table->null_bytes))) - DBUG_RETURN(res); + /* + We are scanning records and want to update the record + that was just found, call updateTuple on the cursor + to take over the lock to a new update operation + And thus setting the primary key of the record from + the active record in cursor + */ + DBUG_PRINT("info", ("Calling updateTuple on cursor")); + if (!(op= cursor->updateTuple())) + ERR_RETURN(trans->getNdbError()); + ops_pending++; + } + else + { + if (!(op= trans->getNdbOperation(m_tabname)) || + op->updateTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + + // Require that the PK for this record has previously been + // read into m_value + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec); + DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key(op, old_data + table->null_bytes))) + DBUG_RETURN(res); + } } // Set non-key attribute(s) for (i= 0; i < table->fields; i++) { - Field *field= table->field[i]; if ((thd->query_id == field->query_id) && (!(field->flags & PRI_KEY_FLAG)) && set_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } - + // Execute update operation - if (trans->execute(NoCommit) != 0) + if (!cursor && trans->execute(NoCommit) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(0); @@ -1161,39 +1219,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) int ha_ndbcluster::delete_row(const byte *record) { NdbConnection *trans= m_active_trans; + NdbResultSet* cursor= m_active_cursor; NdbOperation *op; DBUG_ENTER("delete_row"); statistic_increment(ha_delete_count,&LOCK_status); - if (!(op=trans->getNdbOperation(m_tabname)) || - op->deleteTuple() != 0) - ERR_RETURN(trans->getNdbError()); - - if (table->primary_key == MAX_KEY) + if (cursor) { - // This table has no primary key, use "hidden" primary key - DBUG_PRINT("info", ("Using hidden key")); - uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; - DBUG_ASSERT(rec != NULL); + /* + We are scanning records and want to update the record + that was just found, call deleteTuple on the cursor + to take over the lock to a new update operation + And thus setting the primary key of the record from + the active record in cursor + */ + DBUG_PRINT("info", ("Calling deleteTuple on cursor")); + if (cursor->deleteTuple() != 0) + ERR_RETURN(trans->getNdbError()); + ops_pending++; - if (set_hidden_key(op, no_fields, rec->aRef())) - ERR_RETURN(op->getNdbError()); - } - else + // If deleting from cursor, NoCommit will be handled in next_result + DBUG_RETURN(0); + } + else { - int res; - if ((res= set_primary_key(op))) - return res; + + if (!(op=trans->getNdbOperation(m_tabname)) || + op->deleteTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec != NULL); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key(op))) + return res; + } } - + // Execute delete operation if (trans->execute(NoCommit) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(0); } - + /* Unpack a record read from NDB @@ -1481,11 +1561,7 @@ int ha_ndbcluster::index_next(byte *buf) int error = 1; statistic_increment(ha_read_next_count,&LOCK_status); - if (!m_active_cursor) - error= HA_ERR_END_OF_FILE; - else - error = next_result(buf); - DBUG_RETURN(error); + DBUG_RETURN(next_result(buf)); } @@ -1519,7 +1595,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key, { KEY* key_info; int error= 1; - byte* buf = table->record[0]; + byte* buf= table->record[0]; DBUG_ENTER("ha_ndbcluster::read_range_first"); DBUG_PRINT("info", ("sorted: %d", sorted)); @@ -1548,6 +1624,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key, DBUG_RETURN(error); } + int ha_ndbcluster::read_range_next(bool eq_range) { DBUG_ENTER("ha_ndbcluster::read_range_next"); @@ -1587,12 +1664,10 @@ int ha_ndbcluster::rnd_next(byte *buf) { DBUG_ENTER("rnd_next"); statistic_increment(ha_read_rnd_next_count, &LOCK_status); - int error = 1; + if (!m_active_cursor) - error = full_table_scan(buf); - else - error = next_result(buf); - DBUG_RETURN(error); + DBUG_RETURN(full_table_scan(buf)); + DBUG_RETURN(next_result(buf)); } @@ -1920,6 +1995,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, m_lock.type=lock_type; } *to++= &m_lock; + + DBUG_PRINT("exit", ("lock_type: %d", lock_type)); DBUG_RETURN(to); } @@ -2034,8 +2111,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) (NdbConnection*)thd->transaction.stmt.ndb_tid; DBUG_ASSERT(m_active_trans); + // Start of transaction retrieve_all_fields= FALSE; - + ops_pending= 0; } else { @@ -2087,7 +2165,9 @@ int ha_ndbcluster::start_stmt(THD *thd) } m_active_trans= trans; + // Start of statement retrieve_all_fields= FALSE; + ops_pending= 0; DBUG_RETURN(error); } @@ -2568,7 +2648,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): retrieve_all_fields(FALSE), rows_to_insert(0), rows_inserted(0), - bulk_insert_rows(1024) + bulk_insert_rows(1024), + ops_pending(0) { int i; |