diff options
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 1027 |
1 files changed, 655 insertions, 372 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 21056ef4a8f..d499218a8c3 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -20,12 +20,6 @@ NDB Cluster */ -/* - TODO - After CREATE DATABASE gör discover på alla tabeller i den databasen - -*/ - #ifdef __GNUC__ #pragma implementation // gcc: Class implementation @@ -45,8 +39,13 @@ // Default value for parallelism static const int parallelism= 240; +// Default value for max number of transactions +// createable against NDB from this handler +static const int max_transactions = 256; + #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 + #define ERR_PRINT(err) \ DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message)) @@ -64,6 +63,8 @@ typedef NdbDictionary::Dictionary NDBDICT; bool ndbcluster_inited= false; +static Ndb* g_ndb= NULL; + // Handler synchronization pthread_mutex_t ndbcluster_mutex; @@ -95,7 +96,22 @@ static const err_code_mapping err_map[]= { 630, HA_ERR_FOUND_DUPP_KEY }, { 893, HA_ERR_FOUND_DUPP_UNIQUE }, { 721, HA_ERR_TABLE_EXIST }, + { 4244, HA_ERR_TABLE_EXIST }, { 241, HA_ERR_OLD_METADATA }, + + { 266, HA_ERR_LOCK_WAIT_TIMEOUT }, + { 274, HA_ERR_LOCK_WAIT_TIMEOUT }, + { 296, HA_ERR_LOCK_WAIT_TIMEOUT }, + { 297, HA_ERR_LOCK_WAIT_TIMEOUT }, + { 237, HA_ERR_LOCK_WAIT_TIMEOUT }, + + { 623, HA_ERR_RECORD_FILE_FULL }, + { 624, HA_ERR_RECORD_FILE_FULL }, + { 625, HA_ERR_RECORD_FILE_FULL }, + { 826, HA_ERR_RECORD_FILE_FULL }, + { 827, HA_ERR_RECORD_FILE_FULL }, + { 832, HA_ERR_RECORD_FILE_FULL }, + { -1, -1 } }; @@ -144,6 +160,28 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans) /* + Override the default get_error_message in order to add the + error message of NDB + */ + +bool ha_ndbcluster::get_error_message(int error, + String *buf) +{ + DBUG_ENTER("ha_ndbcluster::get_error_message"); + DBUG_PRINT("enter", ("error: %d", error)); + + if (!m_ndb) + DBUG_RETURN(false); + + const NdbError err= m_ndb->getNdbError(error); + bool temporary= err.status==NdbError::TemporaryError; + buf->set(err.message, strlen(err.message), &my_charset_bin); + DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary)); + DBUG_RETURN(temporary); +} + + +/* Instruct NDB to set the value of the hidden primary key */ @@ -297,7 +335,7 @@ int ha_ndbcluster::get_metadata(const char *path) const NDBTAB *tab; const void *data, *pack_data; const char **key_name; - uint ndb_columns, mysql_columns, length, pack_length, i; + uint ndb_columns, mysql_columns, length, pack_length; int error; DBUG_ENTER("get_metadata"); DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path)); @@ -353,60 +391,111 @@ int ha_ndbcluster::get_metadata(const char *path) // All checks OK, lets use the table m_table= (void*)tab; - for (i= 0; i < MAX_KEY; i++) - m_indextype[i]= UNDEFINED_INDEX; + DBUG_RETURN(build_index_list()); +} +int ha_ndbcluster::build_index_list() +{ + char *name; + const char *index_name; + static const char* unique_suffix= "$unique"; + uint i, name_len; + DBUG_ENTER("build_index_list"); + // Save information about all known indexes - for (i= 0; i < table->keys; i++) - m_indextype[i] = get_index_type_from_table(i); - - DBUG_RETURN(0); + for (uint i= 0; i < table->keys; i++) + { + NDB_INDEX_TYPE idx_type= get_index_type_from_table(i); + m_indextype[i]= idx_type; + + if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX) + { + index_name= get_index_name(i); + name_len= strlen(index_name)+strlen(unique_suffix)+1; + // Create name for unique index by appending "$unique"; + if (!(name= my_malloc(name_len, MYF(MY_WME)))) + DBUG_RETURN(2); + strxnmov(name, name_len, index_name, unique_suffix, NullS); + m_unique_index_name[i]= name; + DBUG_PRINT("info", ("Created unique index name: %s for index %d", + name, i)); + } + } + DBUG_RETURN(0); } + + /* Decode the type of an index from information provided in table object */ -NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const +NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const { - if (index_no == table->primary_key) - return PRIMARY_KEY_INDEX; + bool is_hash_index= (table->key_info[inx].algorithm == HA_KEY_ALG_HASH); + if (inx == table->primary_key) + return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX; else - return ((table->key_info[index_no].flags & HA_NOSAME) ? - UNIQUE_INDEX : + return ((table->key_info[inx].flags & HA_NOSAME) ? + (is_hash_index ? UNIQUE_INDEX : UNIQUE_ORDERED_INDEX) : ORDERED_INDEX); } - + void ha_ndbcluster::release_metadata() { + uint i; + DBUG_ENTER("release_metadata"); DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); m_table= NULL; + // Release index list + for (i= 0; i < MAX_KEY; i++) + { + if (m_unique_index_name[i]) + my_free((char*)m_unique_index_name[i], MYF(0)); + m_unique_index_name[i]= NULL; + } + DBUG_VOID_RETURN; } +NdbCursorOperation::LockMode get_ndb_lock_type(enum thr_lock_type type) +{ + return (type == TL_WRITE_ALLOW_WRITE) ? + NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read; +} + static const ulong index_type_flags[]= { /* UNDEFINED_INDEX */ 0, /* PRIMARY_KEY_INDEX */ - HA_ONLY_WHOLE_INDEX | - HA_WRONG_ASCII_ORDER | - HA_NOT_READ_PREFIX_LAST, + HA_ONLY_WHOLE_INDEX, + + /* PRIMARY_KEY_ORDERED_INDEX */ + /* + Enable HA_KEYREAD_ONLY when "sorted" indexes are supported, + thus ORDERD BY clauses can be optimized by reading directly + through the index. + */ + // HA_KEYREAD_ONLY | + HA_READ_NEXT | + HA_READ_RANGE, /* UNIQUE_INDEX */ - HA_ONLY_WHOLE_INDEX | - HA_WRONG_ASCII_ORDER | - HA_NOT_READ_PREFIX_LAST, + HA_ONLY_WHOLE_INDEX, + + /* UNIQUE_ORDERED_INDEX */ + HA_READ_NEXT | + HA_READ_RANGE, /* ORDERED_INDEX */ HA_READ_NEXT | - HA_READ_PREV | - HA_NOT_READ_AFTER_KEY + HA_READ_RANGE, }; static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong); @@ -416,6 +505,11 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const return table->keynames.type_names[idx_no]; } +inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const +{ + return m_unique_index_name[idx_no]; +} + inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const { DBUG_ASSERT(idx_no < MAX_KEY); @@ -430,9 +524,10 @@ inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const flags depending on the type of the index. */ -inline ulong ha_ndbcluster::index_flags(uint idx_no) const +inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part) const { DBUG_ENTER("index_flags"); + DBUG_PRINT("info", ("idx_no: %d", idx_no)); DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size); DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]); } @@ -457,6 +552,24 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key) } +int ha_ndbcluster::set_primary_key_from_old_data(NdbOperation *op, const byte *old_data) +{ + KEY* key_info= table->key_info + table->primary_key; + KEY_PART_INFO* key_part= key_info->key_part; + KEY_PART_INFO* end= key_part+key_info->key_parts; + DBUG_ENTER("set_primary_key_from_old_data"); + + for (; key_part != end; key_part++) + { + Field* field= key_part->field; + if (set_ndb_key(op, field, + key_part->fieldnr-1, old_data+key_part->offset)) + ERR_RETURN(op->getNdbError()); + } + DBUG_RETURN(0); +} + + int ha_ndbcluster::set_primary_key(NdbOperation *op) { DBUG_ENTER("set_primary_key"); @@ -550,7 +663,6 @@ int ha_ndbcluster::unique_index_read(const byte *key, uint key_len, byte *buf) { NdbConnection *trans= m_active_trans; - const char *index_name; NdbIndexOperation *op; THD *thd= current_thd; byte *key_ptr; @@ -560,9 +672,10 @@ int ha_ndbcluster::unique_index_read(const byte *key, DBUG_ENTER("unique_index_read"); DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); DBUG_DUMP("key", (char*)key, key_len); + DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index))); - index_name= get_index_name(active_index); - if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) || + if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index), + m_tabname)) || op->readTuple() != 0) ERR_RETURN(trans->getNdbError()); @@ -608,25 +721,64 @@ int ha_ndbcluster::unique_index_read(const byte *key, } /* - Get the next record of a started scan + Get the next record of a started scan. Try to fetch + it locally from NdbApi cached records if possible, + otherwise ask NDB for more. + + NOTE + If this is a update/delete make sure to not contact + NDB before any pending ops have been sent to NDB. + */ inline int ha_ndbcluster::next_result(byte *buf) { + int check; NdbConnection *trans= m_active_trans; NdbResultSet *cursor= m_active_cursor; DBUG_ENTER("next_result"); - - if (cursor->nextResult() == 0) - { - // One more record found - unpack_record(buf); - table->status= 0; - DBUG_RETURN(0); - } + + if (!cursor) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + /* + If this an update or delete, call nextResult with false + to process any records already cached in NdbApi + */ + bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE; + do { + DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); + check= cursor->nextResult(contact_ndb); + if (check == 0) + { + // One more record found + DBUG_PRINT("info", ("One more record found")); + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); + } + else if (check == 1 || check == 2) + { + // 1: No more records + // 2: No more cached records + + /* + Before fetching more rows and releasing lock(s), + all pending update or delete operations should + be sent to NDB + */ + DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); + if (ops_pending && (trans->execute(NoCommit) != 0)) + DBUG_RETURN(ndb_err(trans)); + ops_pending= 0; + + contact_ndb= (check == 2); + } + } while (check == 2); + table->status= STATUS_NOT_FOUND; - if (ndb_err(trans)) - ERR_RETURN(trans->getNdbError()); + if (check == -1) + DBUG_RETURN(ndb_err(trans)); // No more records DBUG_PRINT("info", ("No more records")); @@ -635,155 +787,135 @@ inline int ha_ndbcluster::next_result(byte *buf) /* - Read record(s) from NDB using ordered index scan + Set bounds for a ordered index scan, use key_range */ -int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len, - byte *buf, - enum ha_rkey_function find_flag) -{ - uint no_fields= table->fields; - uint tot_len, i; - NdbConnection *trans= m_active_trans; - NdbResultSet *cursor= m_active_cursor; - NdbScanOperation *op; - const char *bound_str= NULL; - const char *index_name; - NdbOperation::BoundType bound_type = NdbOperation::BoundEQ; - bool can_be_handled_by_ndb= FALSE; +int ha_ndbcluster::set_bounds(NdbOperation *op, + const key_range *key, + int bound) +{ + uint i, tot_len; byte *key_ptr; - KEY *key_info; - THD* thd = current_thd; - DBUG_ENTER("ordered_index_scan"); - DBUG_PRINT("enter", ("index: %u", active_index)); - DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); - - index_name= get_index_name(active_index); - if (!(op= trans->getNdbScanOperation(index_name, m_tabname))) - ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(parallelism))) - ERR_RETURN(trans->getNdbError()); - m_active_cursor= cursor; - - switch (find_flag) { - case HA_READ_KEY_EXACT: /* Find first record else error */ - bound_str= "HA_READ_KEY_EXACT"; - bound_type= NdbOperation::BoundEQ; - can_be_handled_by_ndb= TRUE; - break; - case HA_READ_KEY_OR_NEXT: /* Record or next record */ - bound_str= "HA_READ_KEY_OR_NEXT"; - bound_type= NdbOperation::BoundLE; - can_be_handled_by_ndb= TRUE; - break; - case HA_READ_KEY_OR_PREV: /* Record or previous */ - bound_str= "HA_READ_KEY_OR_PREV"; - bound_type= NdbOperation::BoundGE; - can_be_handled_by_ndb= TRUE; - break; - case HA_READ_AFTER_KEY: /* Find next rec. after key-record */ - bound_str= "HA_READ_AFTER_KEY"; - bound_type= NdbOperation::BoundLT; - can_be_handled_by_ndb= TRUE; - break; - case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */ - bound_str= "HA_READ_BEFORE_KEY"; - bound_type= NdbOperation::BoundGT; - can_be_handled_by_ndb= TRUE; - break; - case HA_READ_PREFIX: /* Key which as same prefix */ - bound_str= "HA_READ_PREFIX"; - break; - case HA_READ_PREFIX_LAST: /* Last key with the same prefix */ - bound_str= "HA_READ_PREFIX_LAST"; - break; - case HA_READ_PREFIX_LAST_OR_PREV: - /* Last or prev key with the same prefix */ - bound_str= "HA_READ_PREFIX_LAST_OR_PREV"; - break; - default: - bound_str= "UNKNOWN"; - break; - } - DBUG_PRINT("info", ("find_flag: %s, bound_type: %d," - "can_be_handled_by_ndb: %d", - bound_str, bound_type, can_be_handled_by_ndb)); - if (!can_be_handled_by_ndb) - DBUG_RETURN(1); - + KEY* key_info= table->key_info + active_index; + KEY_PART_INFO* key_part= key_info->key_part; + KEY_PART_INFO* end= key_part+key_info->key_parts; + + DBUG_ENTER("set_bounds"); + DBUG_PRINT("enter", ("bound: %d", bound)); + DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts)); + DBUG_PRINT("enter", ("key->length: %d", key->length)); + DBUG_PRINT("enter", ("key->flag: %d", key->flag)); + // Set bounds using key data tot_len= 0; - key_ptr= (byte *) key; - key_info= table->key_info + active_index; - for (i= 0; i < key_info->key_parts; i++) + key_ptr= (byte *) key->key; + for (; key_part != end; key_part++) { - Field* field= key_info->key_part[i].field; + Field* field= key_part->field; uint32 field_len= field->pack_length(); - DBUG_PRINT("info", ("Set index bound on %s", + tot_len+= field_len; + + const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"}; + DBUG_ASSERT(bound >= 0 && bound <= 4); + DBUG_PRINT("info", ("Set Bound%s on %s", + bounds[bound], field->field_name)); DBUG_DUMP("key", (char*)key_ptr, field_len); - + if (op->setBound(field->field_name, - bound_type, + bound, key_ptr, field_len) != 0) ERR_RETURN(op->getNdbError()); key_ptr+= field_len; - tot_len+= field_len; - if (tot_len >= key_len) + + if (tot_len >= key->length) + break; + + /* + Only one bound which is not EQ can be set + so if this bound was not EQ, bail out and make + a best effort attempt + */ + if (bound != NdbOperation::BoundEQ) break; } - - // Define attributes to read - for (i= 0; i < no_fields; i++) + + DBUG_RETURN(0); +} + + +/* + Start ordered index scan in NDB +*/ + +int ha_ndbcluster::ordered_index_scan(const key_range *start_key, + const key_range *end_key, + bool sorted, byte* buf) +{ + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor; + NdbScanOperation *op; + const char *index_name; + + DBUG_ENTER("ordered_index_scan"); + DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted)); + DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); + + index_name= get_index_name(active_index); + if (!(op= trans->getNdbScanOperation(index_name, m_tabname))) + ERR_RETURN(trans->getNdbError()); + if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type)))) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + + if (start_key && + set_bounds(op, start_key, + (start_key->flag == HA_READ_KEY_EXACT) ? + NdbOperation::BoundEQ : + (start_key->flag == HA_READ_AFTER_KEY) ? + NdbOperation::BoundLT : + NdbOperation::BoundLE)) + DBUG_RETURN(1); + + if (end_key) { - Field *field= table->field[i]; - if ((thd->query_id == field->query_id) || - (field->flags & PRI_KEY_FLAG)) - { - if (get_ndb_value(op, i, field->ptr)) - ERR_RETURN(op->getNdbError()); - } - else + if (start_key && start_key->flag == HA_READ_KEY_EXACT) { - m_value[i]= NULL; + DBUG_PRINT("info", ("start_key is HA_READ_KEY_EXACT ignoring end_key")); } + else if (set_bounds(op, end_key, + (end_key->flag == HA_READ_AFTER_KEY) ? + NdbOperation::BoundGE : + NdbOperation::BoundGT)) + DBUG_RETURN(1); } - - if (table->primary_key == MAX_KEY) - { - DBUG_PRINT("info", ("Getting hidden key")); - // Scanning table with no primary key - int hidden_no= no_fields; -#ifndef DBUG_OFF - const NDBTAB *tab= (NDBTAB *) m_table; - if (!tab->getColumn(hidden_no)) - DBUG_RETURN(1); -#endif - if (get_ndb_value(op, hidden_no, NULL)) - ERR_RETURN(op->getNdbError()); - } - - if (trans->execute(NoCommit) != 0) - DBUG_RETURN(ndb_err(trans)); - DBUG_PRINT("exit", ("Scan started successfully")); - DBUG_RETURN(next_result(buf)); + DBUG_RETURN(define_read_attrs(buf, op)); } -#if 0 /* - Read record(s) from NDB using full table scan with filter + Start a filtered scan in NDB. + + NOTE + This function is here as an example of how to start a + filtered scan. It should be possible to replace full_table_scan + with this function and make a best effort attempt + at filtering out the irrelevant data by converting the "items" + into interpreted instructions. + This would speed up table scans where there is a limiting WHERE clause + that doesn't match any index in the table. + */ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, byte *buf, enum ha_rkey_function find_flag) { - uint no_fields= table->fields; NdbConnection *trans= m_active_trans; - NdbResultSet *cursor= m_active_cursor; + NdbResultSet *cursor; + NdbScanOperation *op; DBUG_ENTER("filtered_scan"); DBUG_PRINT("enter", ("key_len: %u, index: %u", @@ -791,12 +923,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, DBUG_DUMP("key", (char*)key, key_len); DBUG_PRINT("info", ("Starting a new filtered scan on %s", m_tabname)); - NdbScanOperation *op= trans->getNdbScanOperation(m_tabname); - if (!op) + + if (!(op= trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - - cursor= op->readTuples(parallelism); - if (!cursor) + if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type)))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -845,66 +975,49 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, sf.end(); } - // Define attributes to read - for (uint field_no= 0; field_no < no_fields; field_no++) - { - Field *field= table->field[field_no]; - - // Read attribute - DBUG_PRINT("get", ("%d: %s", field_no, field->field_name)); - if (get_ndb_value(op, field_no, field->ptr)) - ERR_RETURN(op->getNdbError()); - } - - if (table->primary_key == MAX_KEY) - { - DBUG_PRINT("info", ("Getting hidden key")); - // Scanning table with no primary key - int hidden_no= no_fields; -#ifndef DBUG_OFF - const NDBTAB *tab= (NDBTAB *) m_table; - if (!tab->getColumn(hidden_no)) - DBUG_RETURN(1); -#endif - if (get_ndb_value(op, hidden_no, NULL)) - ERR_RETURN(op->getNdbError()); - } - - if (trans->execute(NoCommit) != 0) - DBUG_RETURN(ndb_err(trans)); - DBUG_PRINT("exit", ("Scan started successfully")); - DBUG_RETURN(next_result(buf)); + DBUG_RETURN(define_read_attrs(buf, op)); } -#endif /* - Read records from NDB using full table scan + Start full table scan in NDB */ int ha_ndbcluster::full_table_scan(byte *buf) { uint i; - THD *thd= current_thd; - NdbConnection *trans= m_active_trans; NdbResultSet *cursor; NdbScanOperation *op; + NdbConnection *trans= m_active_trans; DBUG_ENTER("full_table_scan"); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); if (!(op=trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(parallelism))) + if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type)))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; - + DBUG_RETURN(define_read_attrs(buf, op)); +} + + +inline +int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) +{ + uint i; + THD *thd= current_thd; + NdbConnection *trans= m_active_trans; + + DBUG_ENTER("define_read_attrs"); + // Define attributes to read for (i= 0; i < table->fields; i++) { Field *field= table->field[i]; if ((thd->query_id == field->query_id) || - (field->flags & PRI_KEY_FLAG)) + (field->flags & PRI_KEY_FLAG) || + retrieve_all_fields) { if (get_ndb_value(op, i, field->ptr)) ERR_RETURN(op->getNdbError()); @@ -991,8 +1104,17 @@ int ha_ndbcluster::write_row(byte *record) to NoCommit the transaction between each row. Find out how this is detected! */ - if (trans->execute(NoCommit) != 0) - DBUG_RETURN(ndb_err(trans)); + rows_inserted++; + if ((rows_inserted == rows_to_insert) || + ((rows_inserted % bulk_insert_rows) == 0)) + { + // Send rows to NDB + DBUG_PRINT("info", ("Sending inserts to NDB, "\ + "rows_inserted:%d, bulk_insert_rows: %d", + rows_inserted, bulk_insert_rows)); + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + } DBUG_RETURN(0); } @@ -1039,6 +1161,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) { THD *thd= current_thd; NdbConnection *trans= m_active_trans; + NdbResultSet* cursor= m_active_cursor; NdbOperation *op; uint i; DBUG_ENTER("update_row"); @@ -1047,49 +1170,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) if (table->timestamp_on_update_now) update_timestamp(new_data+table->timestamp_on_update_now-1); - if (!(op= trans->getNdbOperation(m_tabname)) || - op->updateTuple() != 0) - ERR_RETURN(trans->getNdbError()); - - if (table->primary_key == MAX_KEY) - { - // This table has no primary key, use "hidden" primary key - DBUG_PRINT("info", ("Using hidden key")); - - // Require that the PK for this record has previously been - // read into m_value - uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; - DBUG_ASSERT(rec); - DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); - - if (set_hidden_key(op, no_fields, rec->aRef())) - ERR_RETURN(op->getNdbError()); - } - else + /* Check for update of primary key and return error */ + if ((table->primary_key != MAX_KEY) && + (key_cmp(table->primary_key, old_data, new_data))) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + + if (cursor) { - /* Check for update of primary key and return error */ - if (key_cmp(table->primary_key, old_data, new_data)) - DBUG_RETURN(HA_ERR_UNSUPPORTED); - - int res; - if ((res= set_primary_key(op, old_data + table->null_bytes))) - DBUG_RETURN(res); + /* + We are scanning records and want to update the record + that was just found, call updateTuple on the cursor + to take over the lock to a new update operation + And thus setting the primary key of the record from + the active record in cursor + */ + DBUG_PRINT("info", ("Calling updateTuple on cursor")); + if (!(op= cursor->updateTuple())) + ERR_RETURN(trans->getNdbError()); + ops_pending++; + } + else + { + if (!(op= trans->getNdbOperation(m_tabname)) || + op->updateTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + + // Require that the PK for this record has previously been + // read into m_value + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec); + DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key_from_old_data(op, old_data))) + DBUG_RETURN(res); + } } // Set non-key attribute(s) for (i= 0; i < table->fields; i++) { - Field *field= table->field[i]; if ((thd->query_id == field->query_id) && (!(field->flags & PRI_KEY_FLAG)) && set_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } - + // Execute update operation - if (trans->execute(NoCommit) != 0) + if (!cursor && trans->execute(NoCommit) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(0); @@ -1103,39 +1243,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) int ha_ndbcluster::delete_row(const byte *record) { NdbConnection *trans= m_active_trans; + NdbResultSet* cursor= m_active_cursor; NdbOperation *op; DBUG_ENTER("delete_row"); statistic_increment(ha_delete_count,&LOCK_status); - if (!(op=trans->getNdbOperation(m_tabname)) || - op->deleteTuple() != 0) - ERR_RETURN(trans->getNdbError()); - - if (table->primary_key == MAX_KEY) + if (cursor) { - // This table has no primary key, use "hidden" primary key - DBUG_PRINT("info", ("Using hidden key")); - uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; - DBUG_ASSERT(rec != NULL); + /* + We are scanning records and want to update the record + that was just found, call deleteTuple on the cursor + to take over the lock to a new update operation + And thus setting the primary key of the record from + the active record in cursor + */ + DBUG_PRINT("info", ("Calling deleteTuple on cursor")); + if (cursor->deleteTuple() != 0) + ERR_RETURN(trans->getNdbError()); + ops_pending++; - if (set_hidden_key(op, no_fields, rec->aRef())) - ERR_RETURN(op->getNdbError()); - } - else + // If deleting from cursor, NoCommit will be handled in next_result + DBUG_RETURN(0); + } + else { - int res; - if ((res= set_primary_key(op))) - return res; + + if (!(op=trans->getNdbOperation(m_tabname)) || + op->deleteTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec != NULL); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key(op))) + return res; + } } - + // Execute delete operation if (trans->execute(NoCommit) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(0); } - + /* Unpack a record read from NDB @@ -1223,6 +1385,7 @@ void ha_ndbcluster::print_results() switch (col->getType()) { case NdbDictionary::Column::Blob: + case NdbDictionary::Column::Clob: case NdbDictionary::Column::Undefined: fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); break; @@ -1337,7 +1500,7 @@ int ha_ndbcluster::index_init(uint index) int ha_ndbcluster::index_end() { DBUG_ENTER("index_end"); - DBUG_RETURN(rnd_end()); + DBUG_RETURN(close_scan()); } @@ -1348,28 +1511,12 @@ int ha_ndbcluster::index_read(byte *buf, DBUG_ENTER("index_read"); DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", active_index, key_len, find_flag)); - - int error= 1; - statistic_increment(ha_read_key_count, &LOCK_status); - switch (get_index_type(active_index)){ - case PRIMARY_KEY_INDEX: - error= pk_read(key, key_len, buf); - break; - - case UNIQUE_INDEX: - error= unique_index_read(key, key_len, buf); - break; - - case ORDERED_INDEX: - error= ordered_index_scan(key, key_len, buf, find_flag); - break; - - default: - case UNDEFINED_INDEX: - break; - } - DBUG_RETURN(error); + key_range start_key; + start_key.key= key; + start_key.length= key_len; + start_key.flag= find_flag; + DBUG_RETURN(read_range_first(&start_key, NULL, false, true)); } @@ -1391,11 +1538,7 @@ int ha_ndbcluster::index_next(byte *buf) int error = 1; statistic_increment(ha_read_next_count,&LOCK_status); - if (get_index_type(active_index) == PRIMARY_KEY_INDEX) - error= HA_ERR_END_OF_FILE; - else - error = next_result(buf); - DBUG_RETURN(error); + DBUG_RETURN(next_result(buf)); } @@ -1423,6 +1566,62 @@ int ha_ndbcluster::index_last(byte *buf) } +int ha_ndbcluster::read_range_first(const key_range *start_key, + const key_range *end_key, + bool eq_range, bool sorted) +{ + KEY* key_info; + int error= 1; + byte* buf= table->record[0]; + DBUG_ENTER("ha_ndbcluster::read_range_first"); + DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted)); + + if (m_active_cursor) + close_scan(); + + switch (get_index_type(active_index)){ + case PRIMARY_KEY_ORDERED_INDEX: + case PRIMARY_KEY_INDEX: + key_info= table->key_info + active_index; + if (start_key && + start_key->length == key_info->key_length && + start_key->flag == HA_READ_KEY_EXACT) + { + error= pk_read(start_key->key, start_key->length, buf); + DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error); + } + break; + case UNIQUE_ORDERED_INDEX: + case UNIQUE_INDEX: + key_info= table->key_info + active_index; + if (start_key && + start_key->length == key_info->key_length && + start_key->flag == HA_READ_KEY_EXACT) + { + error= unique_index_read(start_key->key, start_key->length, buf); + DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error); + } + break; + default: + break; + } + + + // Start the ordered index scan and fetch the first row + error= ordered_index_scan(start_key, end_key, sorted, + buf); + + DBUG_RETURN(error); +} + + +int ha_ndbcluster::read_range_next() +{ + DBUG_ENTER("ha_ndbcluster::read_range_next"); + DBUG_RETURN(next_result(table->record[0])); +} + + int ha_ndbcluster::rnd_init(bool scan) { NdbResultSet *cursor= m_active_cursor; @@ -1435,19 +1634,23 @@ int ha_ndbcluster::rnd_init(bool scan) DBUG_RETURN(0); } +int ha_ndbcluster::close_scan() +{ + NdbResultSet *cursor= m_active_cursor; + DBUG_ENTER("close_scan"); + + if (!cursor) + DBUG_RETURN(1); + + cursor->close(); + m_active_cursor= NULL; + DBUG_RETURN(0); +} int ha_ndbcluster::rnd_end() { - NdbResultSet *cursor= m_active_cursor; DBUG_ENTER("rnd_end"); - - if (cursor) - { - DBUG_PRINT("info", ("Closing the cursor")); - cursor->close(); - m_active_cursor= NULL; - } - DBUG_RETURN(0); + DBUG_RETURN(close_scan()); } @@ -1455,12 +1658,10 @@ int ha_ndbcluster::rnd_next(byte *buf) { DBUG_ENTER("rnd_next"); statistic_increment(ha_read_rnd_next_count, &LOCK_status); - int error = 1; + if (!m_active_cursor) - error = full_table_scan(buf); - else - error = next_result(buf); - DBUG_RETURN(error); + DBUG_RETURN(full_table_scan(buf)); + DBUG_RETURN(next_result(buf)); } @@ -1654,6 +1855,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) where field->query_id is the same as the current query id */ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS")); + retrieve_all_fields= TRUE; break; case HA_EXTRA_PREPARE_FOR_DELETE: DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE")); @@ -1679,6 +1881,53 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) DBUG_RETURN(0); } +/* + Start of an insert, remember number of rows to be inserted, it will + be used in write_row and get_autoincrement to send an optimal number + of rows in each roundtrip to the server + + SYNOPSIS + rows number of rows to insert, 0 if unknown + +*/ + +void ha_ndbcluster::start_bulk_insert(ha_rows rows) +{ + int bytes, batch; + const NDBTAB *tab= (NDBTAB *) m_table; + + DBUG_ENTER("start_bulk_insert"); + DBUG_PRINT("enter", ("rows: %d", rows)); + + rows_inserted= 0; + rows_to_insert= rows; + + /* + Calculate how many rows that should be inserted + per roundtrip to NDB. This is done in order to minimize the + number of roundtrips as much as possible. However performance will + degrade if too many bytes are inserted, thus it's limited by this + calculation. + */ + const int bytesperbatch = 8192; + bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns(); + batch= bytesperbatch/bytes; + batch= batch == 0 ? 1 : batch; + DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes)); + bulk_insert_rows= batch; + + DBUG_VOID_RETURN; +} + +/* + End of an insert + */ +int ha_ndbcluster::end_bulk_insert() +{ + DBUG_ENTER("end_bulk_insert"); + DBUG_RETURN(0); +} + int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) { @@ -1708,7 +1957,7 @@ const char **ha_ndbcluster::bas_ext() const double ha_ndbcluster::scan_time() { - return rows2double(records/3); + return rows2double(records*1000); } @@ -1740,6 +1989,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, m_lock.type=lock_type; } *to++= &m_lock; + + DBUG_PRINT("exit", ("lock_type: %d", lock_type)); DBUG_RETURN(to); } @@ -1788,6 +2039,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) if (lock_type != F_UNLCK) { + DBUG_PRINT("info", ("lock_type != F_UNLCK")); if (!thd->transaction.ndb_lock_count++) { PRINT_OPTION_FLAGS(thd); @@ -1853,10 +2105,14 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) (NdbConnection*)thd->transaction.all.ndb_tid: (NdbConnection*)thd->transaction.stmt.ndb_tid; DBUG_ASSERT(m_active_trans); - + + // Start of transaction + retrieve_all_fields= FALSE; + ops_pending= 0; } else { + DBUG_PRINT("info", ("lock_type == F_UNLCK")); if (!--thd->transaction.ndb_lock_count) { DBUG_PRINT("trans", ("Last external_lock")); @@ -1904,6 +2160,10 @@ int ha_ndbcluster::start_stmt(THD *thd) thd->transaction.stmt.ndb_tid= trans; } m_active_trans= trans; + + // Start of statement + retrieve_all_fields= FALSE; + ops_pending= 0; DBUG_RETURN(error); } @@ -2041,9 +2301,8 @@ int ha_ndbcluster::create(const char *name, NdbDictionary::Column::Type ndb_type; NDBCOL col; uint pack_length, length, i; - int res; const void *data, *pack_data; - const char **key_name= form->keynames.type_names; + const char **key_names= form->keynames.type_names; char name2[FN_HEADLEN]; DBUG_ENTER("create"); @@ -2086,13 +2345,11 @@ int ha_ndbcluster::create(const char *name, col.setPrimaryKey(field->flags & PRI_KEY_FLAG); if (field->flags & AUTO_INCREMENT_FLAG) { - DBUG_PRINT("info", ("Found auto_increment key")); col.setAutoIncrement(TRUE); - ulonglong value = info->auto_increment_value ? - info->auto_increment_value -1 : - (ulonglong) 0; - DBUG_PRINT("info", ("initial value=%ld", value)); -// col.setInitialAutIncValue(value); + ulonglong value= info->auto_increment_value ? + info->auto_increment_value -1 : (ulonglong) 0; + DBUG_PRINT("info", ("Autoincrement key, initial: %d", value)); + col.setAutoIncrementInitialValue(value); } else col.setAutoIncrement(false); @@ -2132,65 +2389,86 @@ int ha_ndbcluster::create(const char *name, DBUG_PRINT("info", ("Table %s/%s created successfully", m_dbname, m_tabname)); - // Fetch table from NDB, check that it exists - const NDBTAB *tab2= dict->getTable(m_tabname); - if (tab2 == NULL) - { - const NdbError err= dict->getNdbError(); - ERR_PRINT(err); - my_errno= ndb_to_mysql_error(&err); + if ((my_errno= build_index_list())) DBUG_RETURN(my_errno); - } // Create secondary indexes - for (i= 0; i < form->keys; i++) + KEY* key_info= form->key_info; + const char** key_name= key_names; + for (i= 0; i < form->keys; i++, key_info++, key_name++) { - DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i])); - if (i == form->primary_key) - { - DBUG_PRINT("info", ("Skipping it, PK already created")); - continue; - } - - DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i])); - res= create_index(key_name[i], - form->key_info + i); - switch(res){ - case 0: - // OK + int error= 0; + DBUG_PRINT("info", ("Index %u: %s", i, *key_name)); + + switch (get_index_type_from_table(i)){ + + case PRIMARY_KEY_INDEX: + // Do nothing, already created + break; + case PRIMARY_KEY_ORDERED_INDEX: + error= create_ordered_index(*key_name, key_info); + break; + case UNIQUE_ORDERED_INDEX: + if (!(error= create_ordered_index(*key_name, key_info))) + error= create_unique_index(get_unique_index_name(i), key_info); + break; + case UNIQUE_INDEX: + error= create_unique_index(get_unique_index_name(i), key_info); + break; + case ORDERED_INDEX: + error= create_ordered_index(*key_name, key_info); break; default: + DBUG_ASSERT(false); + break; + } + + if (error) + { DBUG_PRINT("error", ("Failed to create index %u", i)); drop_table(); - my_errno= res; - goto err_end; + my_errno= error; + break; } } -err_end: DBUG_RETURN(my_errno); } +int ha_ndbcluster::create_ordered_index(const char *name, + KEY *key_info) +{ + DBUG_ENTER("create_ordered_index"); + DBUG_RETURN(create_index(name, key_info, false)); +} + +int ha_ndbcluster::create_unique_index(const char *name, + KEY *key_info) +{ + + DBUG_ENTER("create_unique_index"); + DBUG_RETURN(create_index(name, key_info, true)); +} + + /* Create an index in NDB Cluster */ int ha_ndbcluster::create_index(const char *name, - KEY *key_info){ + KEY *key_info, + bool unique) +{ NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); KEY_PART_INFO *key_part= key_info->key_part; KEY_PART_INFO *end= key_part + key_info->key_parts; DBUG_ENTER("create_index"); DBUG_PRINT("enter", ("name: %s ", name)); - - // Check that an index with the same name do not already exist - if (dict->getIndex(name, m_tabname)) - ERR_RETURN(dict->getNdbError()); - + NdbDictionary::Index ndb_index(name); - if (key_info->flags & HA_NOSAME) + if (unique) ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); else { @@ -2321,10 +2599,10 @@ int ndbcluster_drop_database(const char *path) longlong ha_ndbcluster::get_auto_increment() -{ - // NOTE If number of values to be inserted is known - // the autoincrement cache could be used here - Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname); +{ + int cache_size = rows_to_insert ? rows_to_insert : 32; + Uint64 auto_value= + m_ndb->getAutoIncrementValue(m_tabname, cache_size); return (longlong)auto_value; } @@ -2340,16 +2618,18 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_ndb(NULL), m_table(NULL), m_table_flags(HA_REC_NOT_IN_SEQ | - HA_KEYPOS_TO_RNDPOS | HA_NOT_EXACT_COUNT | - HA_NO_WRITE_DELAYED | HA_NO_PREFIX_CHAR_KEYS | - HA_NO_BLOBS | - HA_DROP_BEFORE_CREATE | - HA_NOT_READ_AFTER_KEY), - m_use_write(false) + HA_NO_BLOBS), + m_use_write(false), + retrieve_all_fields(FALSE), + rows_to_insert(0), + rows_inserted(0), + bulk_insert_rows(1024), + ops_pending(0) { - + int i; + DBUG_ENTER("ha_ndbcluster"); m_tabname[0]= '\0'; @@ -2360,6 +2640,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): records= 100; block_size= 1024; + for (i= 0; i < MAX_KEY; i++) + { + m_indextype[i]= UNDEFINED_INDEX; + m_unique_index_name[i]= NULL; + } + DBUG_VOID_RETURN; } @@ -2445,7 +2731,7 @@ Ndb* ha_ndbcluster::seize_ndb() #else ndb= new Ndb(""); #endif - if (ndb->init(NDB_MAX_TRANSACTIONS) != 0) + if (ndb->init(max_transactions) != 0) { ERR_PRINT(ndb->getNdbError()); /* @@ -2500,11 +2786,6 @@ int ha_ndbcluster::check_ndb_connection() } m_ndb= (Ndb*)thd->transaction.ndb; m_ndb->setDatabaseName(m_dbname); - if (m_ndb->waitUntilReady() != 0) - { - DBUG_PRINT("error", ("Ndb was not ready")); - DBUG_RETURN(3); - } DBUG_RETURN(0); } @@ -2561,7 +2842,6 @@ int ndbcluster_discover(const char *dbname, const char *name, DBUG_RETURN(0); } -static Ndb* g_ndb= NULL; #ifdef USE_DISCOVER_ON_STARTUP /* @@ -2653,6 +2933,14 @@ bool ndbcluster_end() DBUG_RETURN(0); } +void ndbcluster_print_error(int error) +{ + DBUG_ENTER("ndbcluster_print_error"); + TABLE tab; + tab.table_name = NULL; + ha_ndbcluster error_handler(&tab); + error_handler.print_error(error, MYF(0)); +} /* Set m_tabname from full pathname to table file @@ -2751,32 +3039,27 @@ ha_rows ha_ndbcluster::records_in_range(uint inx, key_range *min_key, key_range *max_key) { - ha_rows records= 10; /* Good guess when you don't know anything */ KEY *key_info= table->key_info + inx; uint key_length= key_info->key_length; + NDB_INDEX_TYPE idx_type= get_index_type(inx); DBUG_ENTER("records_in_range"); DBUG_PRINT("enter", ("inx: %u", inx)); - DBUG_DUMP("start_key", min_key->key, min_key->length); - DBUG_DUMP("end_key", max_key->key, max_key->length); - DBUG_PRINT("enter", ("start_search_flag: %u end_search_flag: %u", - min_key->flag, max_key->flag)); - /* - Check that start_key_len is equal to - the length of the used index and - prevent partial scan/read of hash indexes by returning HA_POS_ERROR - */ - NDB_INDEX_TYPE idx_type= get_index_type(inx); - if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) && - min_key->length < key_length) - { - DBUG_PRINT("warning", ("Tried to use index which required" - "full key length: %d, HA_POS_ERROR", - key_length)); - records= HA_POS_ERROR; - } - DBUG_RETURN(records); + // Prevent partial read of hash indexes by returning HA_POS_ERROR + if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) && + ((min_key && min_key->length < key_length) || + (max_key && max_key->length < key_length))) + DBUG_RETURN(HA_POS_ERROR); + + // Read from hash index with full key + // This is a "const" table which returns only one record! + if ((idx_type != ORDERED_INDEX) && + ((min_key && min_key->length == key_length) || + (max_key && max_key->length == key_length))) + DBUG_RETURN(1); + + DBUG_RETURN(10); /* Good guess when you don't know anything */ } |