diff options
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 1758 |
1 files changed, 1248 insertions, 510 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 1837500d8f7..d59ab919862 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -20,7 +20,6 @@ NDB Cluster */ - #ifdef __GNUC__ #pragma implementation // gcc: Class implementation #endif @@ -33,9 +32,6 @@ #include <ndbapi/NdbApi.hpp> #include <ndbapi/NdbScanFilter.hpp> -#define USE_DISCOVER_ON_STARTUP -//#define USE_NDB_POOL - // Default value for parallelism static const int parallelism= 240; @@ -49,11 +45,13 @@ static const ha_rows autoincrement_prefetch= 32; // connectstring to cluster if given by mysqld const char *ndbcluster_connectstring= 0; +static const char *ha_ndb_ext=".ndb"; + #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 #define ERR_PRINT(err) \ - DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message)) + DBUG_PRINT("error", ("%d message: %s", err.code, err.message)) #define ERR_RETURN(err) \ { \ @@ -63,13 +61,14 @@ const char *ndbcluster_connectstring= 0; // Typedefs for long names typedef NdbDictionary::Column NDBCOL; -typedef NdbDictionary::Table NDBTAB; +typedef NdbDictionary::Table NDBTAB; typedef NdbDictionary::Index NDBINDEX; typedef NdbDictionary::Dictionary NDBDICT; -bool ndbcluster_inited= false; +bool ndbcluster_inited= FALSE; static Ndb* g_ndb= NULL; +static Ndb_cluster_connection* g_ndb_cluster_connection= NULL; // Handler synchronization pthread_mutex_t ndbcluster_mutex; @@ -86,6 +85,16 @@ static int packfrm(const void *data, uint len, const void **pack_data, uint *pac static int unpackfrm(const void **data, uint *len, const void* pack_data); +static int ndb_get_table_statistics(Ndb*, const char *, + Uint64* rows, Uint64* commits); + + +/* + Dummy buffer to read zero pack_length fields + which are mapped to 1 char +*/ +static byte dummy_buf[1]; + /* Error handling functions */ @@ -103,7 +112,9 @@ static const err_code_mapping err_map[]= { 893, HA_ERR_FOUND_DUPP_UNIQUE }, { 721, HA_ERR_TABLE_EXIST }, { 4244, HA_ERR_TABLE_EXIST }, - { 241, HA_ERR_OLD_METADATA }, + + { 709, HA_ERR_NO_SUCH_TABLE }, + { 284, HA_ERR_NO_SUCH_TABLE }, { 266, HA_ERR_LOCK_WAIT_TIMEOUT }, { 274, HA_ERR_LOCK_WAIT_TIMEOUT }, @@ -118,6 +129,8 @@ static const err_code_mapping err_map[]= { 827, HA_ERR_RECORD_FILE_FULL }, { 832, HA_ERR_RECORD_FILE_FULL }, + { 0, 1 }, + { -1, -1 } }; @@ -127,13 +140,158 @@ static int ndb_to_mysql_error(const NdbError *err) uint i; for (i=0 ; err_map[i].ndb_err != err->code ; i++) { - if (err_map[i].my_err == -1) + if (err_map[i].my_err == -1){ + // Push the NDB error message as warning + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + err->code, err->message, "NDB"); return err->code; + } } return err_map[i].my_err; } + +inline +int execute_no_commit(ha_ndbcluster *h, NdbConnection *trans) +{ + int m_batch_execute= 0; +#ifdef NOT_USED + if (m_batch_execute) + return 0; +#endif + return trans->execute(NoCommit,AbortOnError,1); +} + +inline +int execute_commit(ha_ndbcluster *h, NdbConnection *trans) +{ + int m_batch_execute= 0; +#ifdef NOT_USED + if (m_batch_execute) + return 0; +#endif + return trans->execute(Commit,AbortOnError,1); +} + +inline +int execute_no_commit_ie(ha_ndbcluster *h, NdbConnection *trans) +{ + int m_batch_execute= 0; +#ifdef NOT_USED + if (m_batch_execute) + return 0; +#endif + return trans->execute(NoCommit,IgnoreError,1); +} + +/* + Place holder for ha_ndbcluster thread specific data +*/ + +Thd_ndb::Thd_ndb() +{ + ndb= new Ndb(g_ndb_cluster_connection, ""); + lock_count= 0; + count= 0; + error= 0; +} + +Thd_ndb::~Thd_ndb() +{ + if (ndb) + delete ndb; +} + +/* + * manage uncommitted insert/deletes during transactio to get records correct + */ + +struct Ndb_table_local_info { + int no_uncommitted_rows_count; + ulong last_count; + ha_rows records; +}; + +void ha_ndbcluster::set_rec_per_key() +{ + DBUG_ENTER("ha_ndbcluster::get_status_const"); + for (uint i=0 ; i < table->keys ; i++) + { + table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1; + } + DBUG_VOID_RETURN; +} + +void ha_ndbcluster::records_update() +{ + DBUG_ENTER("ha_ndbcluster::records_update"); + struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info; + DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", + ((const NDBTAB *)m_table)->getTableId(), + info->no_uncommitted_rows_count)); + // if (info->records == ~(ha_rows)0) + { + Uint64 rows; + if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){ + info->records= rows; + } + } + { + THD *thd= current_thd; + if (((Thd_ndb*)(thd->transaction.thd_ndb))->error) + info->no_uncommitted_rows_count= 0; + } + records= info->records+ info->no_uncommitted_rows_count; + DBUG_VOID_RETURN; +} + +void ha_ndbcluster::no_uncommitted_rows_execute_failure() +{ + DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure"); + THD *thd= current_thd; + ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 1; + DBUG_VOID_RETURN; +} + +void ha_ndbcluster::no_uncommitted_rows_init(THD *thd) +{ + DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init"); + struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info; + Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb; + if (info->last_count != thd_ndb->count) + { + info->last_count = thd_ndb->count; + info->no_uncommitted_rows_count= 0; + info->records= ~(ha_rows)0; + DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", + ((const NDBTAB *)m_table)->getTableId(), + info->no_uncommitted_rows_count)); + } + DBUG_VOID_RETURN; +} + +void ha_ndbcluster::no_uncommitted_rows_update(int c) +{ + DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update"); + struct Ndb_table_local_info *info= + (struct Ndb_table_local_info *)m_table_info; + info->no_uncommitted_rows_count+= c; + DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", + ((const NDBTAB *)m_table)->getTableId(), + info->no_uncommitted_rows_count)); + DBUG_VOID_RETURN; +} + +void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd) +{ + DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset"); + ((Thd_ndb*)(thd->transaction.thd_ndb))->count++; + ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 0; + DBUG_VOID_RETURN; +} + /* Take care of the error that occured in NDB @@ -142,12 +300,11 @@ static int ndb_to_mysql_error(const NdbError *err) # The mapped error code */ + int ha_ndbcluster::ndb_err(NdbConnection *trans) { int res; const NdbError err= trans->getNdbError(); - if (!err.code) - return 0; // Don't log things to DBUG log if no error DBUG_ENTER("ndb_err"); ERR_PRINT(err); @@ -157,6 +314,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans) NDBDICT *dict= m_ndb->getDictionary(); DBUG_PRINT("info", ("invalidateTable %s", m_tabname)); dict->invalidateTable(m_tabname); + table->version=0L; /* Free when thread is ready */ break; } default: @@ -183,10 +341,11 @@ bool ha_ndbcluster::get_error_message(int error, DBUG_ENTER("ha_ndbcluster::get_error_message"); DBUG_PRINT("enter", ("error: %d", error)); - if (!m_ndb) - DBUG_RETURN(false); + Ndb *ndb= ((Thd_ndb*)current_thd->transaction.thd_ndb)->ndb; + if (!ndb) + DBUG_RETURN(FALSE); - const NdbError err= m_ndb->getNdbError(error); + const NdbError err= ndb->getNdbError(error); bool temporary= err.status==NdbError::TemporaryError; buf->set(err.message, strlen(err.message), &my_charset_bin); DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary)); @@ -196,7 +355,8 @@ bool ha_ndbcluster::get_error_message(int error, /* Check if type is supported by NDB. - TODO Use this once, not in every operation + TODO Use this once in open(), not in every operation + */ static inline bool ndb_supported_type(enum_field_types type) @@ -224,12 +384,12 @@ static inline bool ndb_supported_type(enum_field_types type) case MYSQL_TYPE_LONG_BLOB: case MYSQL_TYPE_ENUM: case MYSQL_TYPE_SET: - return true; + return TRUE; case MYSQL_TYPE_NULL: case MYSQL_TYPE_GEOMETRY: break; } - return false; + return FALSE; } @@ -277,7 +437,7 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, */ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, - uint fieldnr) + uint fieldnr, bool *set_blob_value) { const byte* field_ptr= field->ptr; uint32 pack_len= field->pack_length(); @@ -289,6 +449,13 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, if (ndb_supported_type(field->type())) { + // ndb currently does not support size 0 + const byte *empty_field= ""; + if (pack_len == 0) + { + pack_len= 1; + field_ptr= empty_field; + } if (! (field->flags & BLOB_FLAG)) { if (field->is_null()) @@ -312,14 +479,18 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, char* blob_ptr= NULL; field_blob->get_ptr(&blob_ptr); - // Looks like NULL blob can also be signaled in this way - if (blob_ptr == NULL) - DBUG_RETURN(ndb_blob->setNull() != 0); + // Looks like NULL ptr signals length 0 blob + if (blob_ptr == NULL) { + DBUG_ASSERT(blob_len == 0); + blob_ptr= (char*)""; + } DBUG_PRINT("value", ("set blob ptr=%x len=%u", (unsigned)blob_ptr, blob_len)); DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26)); + if (set_blob_value) + *set_blob_value= TRUE; // No callback needed to write value DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0); } @@ -414,7 +585,7 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) */ int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field, - uint fieldnr) + uint fieldnr, byte* buf) { DBUG_ENTER("get_ndb_value"); DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr, @@ -422,12 +593,19 @@ int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field, if (field != NULL) { + DBUG_ASSERT(buf); if (ndb_supported_type(field->type())) { DBUG_ASSERT(field->ptr != NULL); if (! (field->flags & BLOB_FLAG)) - { - m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr); + { + byte *field_buf; + if (field->pack_length() != 0) + field_buf= buf + (field->ptr - table->record[0]); + else + field_buf= dummy_buf; + m_value[fieldnr].rec= ndb_op->getValue(fieldnr, + field_buf); DBUG_RETURN(m_value[fieldnr].rec == NULL); } @@ -459,24 +637,24 @@ int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field, bool ha_ndbcluster::uses_blob_value(bool all_fields) { if (table->blob_fields == 0) - return false; + return FALSE; if (all_fields) - return true; + return TRUE; { uint no_fields= table->fields; int i; - THD *thd= current_thd; + THD *thd= table->in_use; // They always put blobs at the end.. for (i= no_fields - 1; i >= 0; i--) { Field *field= table->field[i]; if (thd->query_id == field->query_id) { - return true; + return TRUE; } } } - return false; + return FALSE; } @@ -494,75 +672,77 @@ int ha_ndbcluster::get_metadata(const char *path) { NDBDICT *dict= m_ndb->getDictionary(); const NDBTAB *tab; - const void *data, *pack_data; - const char **key_name; - uint ndb_columns, mysql_columns, length, pack_length; int error; + bool invalidating_ndb_table= FALSE; + DBUG_ENTER("get_metadata"); DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path)); - if (!(tab= dict->getTable(m_tabname))) - ERR_RETURN(dict->getNdbError()); - DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); - - /* - This is the place to check that the table we got from NDB - is equal to the one on local disk - */ - ndb_columns= (uint) tab->getNoOfColumns(); - mysql_columns= table->fields; - if (table->primary_key == MAX_KEY) - ndb_columns--; - if (ndb_columns != mysql_columns) - { - DBUG_PRINT("error", - ("Wrong number of columns, ndb: %d mysql: %d", - ndb_columns, mysql_columns)); - DBUG_RETURN(HA_ERR_OLD_METADATA); - } - - /* - Compare FrmData in NDB with frm file from disk. - */ - error= 0; - if (readfrm(path, &data, &length) || - packfrm(data, length, &pack_data, &pack_length)) - { - my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); - my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR)); - DBUG_RETURN(1); - } + do { + const void *data, *pack_data; + uint length, pack_length; + + if (!(tab= dict->getTable(m_tabname))) + ERR_RETURN(dict->getNdbError()); + DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); + /* + Compare FrmData in NDB with frm file from disk. + */ + error= 0; + if (readfrm(path, &data, &length) || + packfrm(data, length, &pack_data, &pack_length)) + { + my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); + my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_RETURN(1); + } - if ((pack_length != tab->getFrmLength()) || - (memcmp(pack_data, tab->getFrmData(), pack_length))) - { - DBUG_PRINT("error", - ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", - pack_length, tab->getFrmLength(), - memcmp(pack_data, tab->getFrmData(), pack_length))); - DBUG_DUMP("pack_data", (char*)pack_data, pack_length); - DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength()); - error= HA_ERR_OLD_METADATA; - } - my_free((char*)data, MYF(0)); - my_free((char*)pack_data, MYF(0)); + if ((pack_length != tab->getFrmLength()) || + (memcmp(pack_data, tab->getFrmData(), pack_length))) + { + if (!invalidating_ndb_table) + { + DBUG_PRINT("info", ("Invalidating table")); + dict->invalidateTable(m_tabname); + invalidating_ndb_table= TRUE; + } + else + { + DBUG_PRINT("error", + ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", + pack_length, tab->getFrmLength(), + memcmp(pack_data, tab->getFrmData(), pack_length))); + DBUG_DUMP("pack_data", (char*)pack_data, pack_length); + DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength()); + error= 3; + invalidating_ndb_table= FALSE; + } + } + else + { + invalidating_ndb_table= FALSE; + } + my_free((char*)data, MYF(0)); + my_free((char*)pack_data, MYF(0)); + } while (invalidating_ndb_table); + if (error) DBUG_RETURN(error); - // All checks OK, lets use the table - m_table= (void*)tab; - + m_table= NULL; + m_table_info= NULL; + DBUG_RETURN(build_index_list(table, ILBP_OPEN)); } int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) { + uint i; int error= 0; - char *name; - const char *index_name; + const char *name, *index_name; + char unique_index_name[FN_LEN]; static const char* unique_suffix= "$unique"; - uint i, name_len; KEY* key_info= tab->key_info; const char **key_name= tab->keynames.type_names; NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); @@ -576,21 +756,15 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) m_index[i].type= idx_type; if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX) { - name_len= strlen(index_name)+strlen(unique_suffix)+1; - // Create name for unique index by appending "$unique"; - if (!(name= my_malloc(name_len, MYF(MY_WME)))) - DBUG_RETURN(2); - strxnmov(name, name_len, index_name, unique_suffix, NullS); - m_index[i].unique_name= name; - DBUG_PRINT("info", ("Created unique index name: %s for index %d", - name, i)); + strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS); + DBUG_PRINT("info", ("Created unique index name \'%s\' for index %d", + unique_index_name, i)); } // Create secondary indexes if in create phase if (phase == ILBP_CREATE) { - DBUG_PRINT("info", ("Creating index %u: %s", i, index_name)); - - switch (m_index[i].type){ + DBUG_PRINT("info", ("Creating index %u: %s", i, index_name)); + switch (idx_type){ case PRIMARY_KEY_INDEX: // Do nothing, already created @@ -600,16 +774,16 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) break; case UNIQUE_ORDERED_INDEX: if (!(error= create_ordered_index(index_name, key_info))) - error= create_unique_index(get_unique_index_name(i), key_info); + error= create_unique_index(unique_index_name, key_info); break; case UNIQUE_INDEX: - error= create_unique_index(get_unique_index_name(i), key_info); + error= create_unique_index(unique_index_name, key_info); break; case ORDERED_INDEX: error= create_ordered_index(index_name, key_info); break; default: - DBUG_ASSERT(false); + DBUG_ASSERT(FALSE); break; } if (error) @@ -620,21 +794,20 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) } } // Add handles to index objects - DBUG_PRINT("info", ("Trying to add handle to index %s", index_name)); - if ((m_index[i].type != PRIMARY_KEY_INDEX) && - (m_index[i].type != UNIQUE_INDEX)) + if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX) { + DBUG_PRINT("info", ("Get handle to index %s", index_name)); const NDBINDEX *index= dict->getIndex(index_name, m_tabname); if (!index) DBUG_RETURN(1); m_index[i].index= (void *) index; } - if (m_index[i].unique_name) + if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX) { - const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname); + DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name)); + const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname); if (!index) DBUG_RETURN(1); m_index[i].unique_index= (void *) index; } - DBUG_PRINT("info", ("Added handle to index %s", index_name)); } DBUG_RETURN(error); @@ -665,13 +838,11 @@ void ha_ndbcluster::release_metadata() DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); m_table= NULL; + m_table_info= NULL; // Release index list for (i= 0; i < MAX_KEY; i++) { - if (m_index[i].unique_name) - my_free((char*)m_index[i].unique_name, MYF(0)); - m_index[i].unique_name= NULL; m_index[i].unique_index= NULL; m_index[i].index= NULL; } @@ -681,17 +852,12 @@ void ha_ndbcluster::release_metadata() int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { - int lm; - if (type == TL_WRITE_ALLOW_WRITE) - lm= NdbScanOperation::LM_Exclusive; + if (type >= TL_WRITE_ALLOW_WRITE) + return NdbOperation::LM_Exclusive; else if (uses_blob_value(retrieve_all_fields)) - /* - TODO use a new scan mode to read + lock + keyinfo - */ - lm= NdbScanOperation::LM_Exclusive; + return NdbOperation::LM_Read; else - lm= NdbScanOperation::LM_CommittedRead; - return lm; + return NdbOperation::LM_CommittedRead; } static const ulong index_type_flags[]= @@ -709,33 +875,26 @@ static const ulong index_type_flags[]= through the index. */ // HA_KEYREAD_ONLY | - HA_READ_NEXT | - HA_READ_RANGE, + HA_READ_NEXT | + HA_READ_RANGE | + HA_READ_ORDER, /* UNIQUE_INDEX */ HA_ONLY_WHOLE_INDEX, /* UNIQUE_ORDERED_INDEX */ - HA_READ_NEXT | - HA_READ_RANGE, + HA_READ_NEXT | + HA_READ_RANGE | + HA_READ_ORDER, /* ORDERED_INDEX */ - HA_READ_NEXT | - HA_READ_RANGE, + HA_READ_NEXT | + HA_READ_RANGE | + HA_READ_ORDER }; static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong); -inline const char* ha_ndbcluster::get_index_name(uint idx_no) const -{ - return table->keynames.type_names[idx_no]; -} - -inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const -{ - return m_index[idx_no].unique_name; -} - inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const { DBUG_ASSERT(idx_no < MAX_KEY); @@ -829,8 +988,10 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) DBUG_PRINT("enter", ("key_len: %u", key_len)); DBUG_DUMP("key", (char*)key, key_len); - if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) || - op->readTuple() != 0) + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || + op->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); if (table->primary_key == MAX_KEY) @@ -842,7 +1003,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ERR_RETURN(trans->getNdbError()); // Read key at the same time, for future reference - if (get_ndb_value(op, NULL, no_fields)) + if (get_ndb_value(op, NULL, no_fields, NULL)) ERR_RETURN(trans->getNdbError()); } else @@ -859,7 +1020,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if ((thd->query_id == field->query_id) || retrieve_all_fields) { - if (get_ndb_value(op, field, i)) + if (get_ndb_value(op, field, i, buf)) ERR_RETURN(trans->getNdbError()); } else @@ -869,7 +1030,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) } } - if (trans->execute(NoCommit, IgnoreError) != 0) + if (execute_no_commit_ie(this,trans) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -898,8 +1059,10 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) // We have allready retrieved all fields, nothing to complement DBUG_RETURN(0); - if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) || - op->readTuple() != 0) + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || + op->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); int res; @@ -913,12 +1076,12 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) if (!(field->flags & PRI_KEY_FLAG) && (thd->query_id != field->query_id)) { - if (get_ndb_value(op, field, i)) + if (get_ndb_value(op, field, i, new_data)) ERR_RETURN(trans->getNdbError()); } } - if (trans->execute(NoCommit) != 0) + if (execute_no_commit(this,trans) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -948,12 +1111,13 @@ int ha_ndbcluster::unique_index_read(const byte *key, DBUG_ENTER("unique_index_read"); DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); DBUG_DUMP("key", (char*)key, key_len); - DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index))); + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); if (!(op= trans->getNdbIndexOperation((NDBINDEX *) m_index[active_index].unique_index, - (NDBTAB *) m_table)) || - op->readTuple() != 0) + (const NDBTAB *) m_table)) || + op->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); // Set secondary index key(s) @@ -976,7 +1140,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((thd->query_id == field->query_id) || (field->flags & PRI_KEY_FLAG)) { - if (get_ndb_value(op, field, i)) + if (get_ndb_value(op, field, i, buf)) ERR_RETURN(op->getNdbError()); } else @@ -986,7 +1150,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, } } - if (trans->execute(NoCommit, IgnoreError) != 0) + if (execute_no_commit_ie(this,trans) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -1022,7 +1186,7 @@ inline int ha_ndbcluster::next_result(byte *buf) If this an update or delete, call nextResult with false to process any records already cached in NdbApi */ - bool contact_ndb= m_lock.type != TL_WRITE_ALLOW_WRITE; + bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE; do { DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); /* @@ -1030,10 +1194,10 @@ inline int ha_ndbcluster::next_result(byte *buf) */ if (ops_pending && blobs_pending) { - if (trans->execute(NoCommit) != 0) - DBUG_RETURN(ndb_err(trans)); + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); ops_pending= 0; - blobs_pending= false; + blobs_pending= FALSE; } check= cursor->nextResult(contact_ndb); if (check == 0) @@ -1056,9 +1220,22 @@ inline int ha_ndbcluster::next_result(byte *buf) be sent to NDB */ DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); - if (ops_pending && (trans->execute(NoCommit) != 0)) - DBUG_RETURN(ndb_err(trans)); - ops_pending= 0; + if (ops_pending) + { + if (current_thd->transaction.on) + { + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + } + else + { + if (execute_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + int res= trans->restart(); + DBUG_ASSERT(res == 0); + } + ops_pending= 0; + } contact_ndb= (check == 2); } @@ -1073,76 +1250,204 @@ inline int ha_ndbcluster::next_result(byte *buf) DBUG_RETURN(HA_ERR_END_OF_FILE); } - /* - Set bounds for a ordered index scan, use key_range + Set bounds for ordered index scan. */ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, - const key_range *key, - int bound) + const key_range *keys[2]) { - uint key_len, key_store_len, tot_len, key_tot_len; - byte *key_ptr; - KEY* key_info= table->key_info + active_index; - KEY_PART_INFO* key_part= key_info->key_part; - KEY_PART_INFO* end= key_part+key_info->key_parts; - Field* field; - bool key_nullable, key_null; + const KEY *const key_info= table->key_info + active_index; + const uint key_parts= key_info->key_parts; + uint key_tot_len[2]; + uint tot_len; + uint i, j; DBUG_ENTER("set_bounds"); - DBUG_PRINT("enter", ("bound: %d", bound)); - DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts)); - DBUG_PRINT("enter", ("key->length: %d", key->length)); - DBUG_PRINT("enter", ("key->flag: %d", key->flag)); + DBUG_PRINT("info", ("key_parts=%d", key_parts)); - // Set bounds using key data + for (j= 0; j <= 1; j++) + { + const key_range *key= keys[j]; + if (key != NULL) + { + // for key->flag see ha_rkey_function + DBUG_PRINT("info", ("key %d length=%d flag=%d", + j, key->length, key->flag)); + key_tot_len[j]= key->length; + } + else + { + DBUG_PRINT("info", ("key %d not present", j)); + key_tot_len[j]= 0; + } + } tot_len= 0; - key_ptr= (byte *) key->key; - key_tot_len= key->length; - for (; key_part != end; key_part++) - { - field= key_part->field; - key_len= key_part->length; - key_store_len= key_part->store_length; - key_nullable= (bool) key_part->null_bit; - key_null= (field->maybe_null() && *key_ptr); - tot_len+= key_store_len; - - const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"}; - DBUG_ASSERT(bound >= 0 && bound <= 4); - DBUG_PRINT("info", ("Set Bound%s on %s %s %s %s", - bounds[bound], - field->field_name, - key_nullable ? "NULLABLE" : "", - key_null ? "NULL":"")); - DBUG_PRINT("info", ("Total length %ds", tot_len)); - - DBUG_DUMP("key", (char*) key_ptr, key_store_len); - - if (op->setBound(field->field_name, - bound, - key_null ? 0 : (key_nullable ? key_ptr + 1 : key_ptr), - key_null ? 0 : key_len) != 0) - ERR_RETURN(op->getNdbError()); - - key_ptr+= key_store_len; - if (tot_len >= key_tot_len) - break; + for (i= 0; i < key_parts; i++) + { + KEY_PART_INFO *key_part= &key_info->key_part[i]; + Field *field= key_part->field; + uint part_len= key_part->length; + uint part_store_len= key_part->store_length; + bool part_nullable= (bool) key_part->null_bit; + // Info about each key part + struct part_st { + bool part_last; + const key_range *key; + const byte *part_ptr; + bool part_null; + int bound_type; + const char* bound_ptr; + }; + struct part_st part[2]; + + for (j= 0; j <= 1; j++) + { + struct part_st &p = part[j]; + p.key= NULL; + p.bound_type= -1; + if (tot_len < key_tot_len[j]) + { + p.part_last= (tot_len + part_store_len >= key_tot_len[j]); + p.key= keys[j]; + p.part_ptr= &p.key->key[tot_len]; + p.part_null= (field->maybe_null() && *p.part_ptr); + p.bound_ptr= (const char *) + p.part_null ? 0 : part_nullable ? p.part_ptr + 1 : p.part_ptr; + + if (j == 0) + { + switch (p.key->flag) + { + case HA_READ_KEY_EXACT: + p.bound_type= NdbIndexScanOperation::BoundEQ; + break; + case HA_READ_KEY_OR_NEXT: + p.bound_type= NdbIndexScanOperation::BoundLE; + break; + case HA_READ_AFTER_KEY: + if (! p.part_last) + p.bound_type= NdbIndexScanOperation::BoundLE; + else + p.bound_type= NdbIndexScanOperation::BoundLT; + break; + default: + break; + } + } + if (j == 1) { + switch (p.key->flag) + { + case HA_READ_BEFORE_KEY: + if (! p.part_last) + p.bound_type= NdbIndexScanOperation::BoundGE; + else + p.bound_type= NdbIndexScanOperation::BoundGT; + break; + case HA_READ_AFTER_KEY: // weird + p.bound_type= NdbIndexScanOperation::BoundGE; + break; + default: + break; + } + } - /* - Only one bound which is not EQ can be set - so if this bound was not EQ, bail out and make - a best effort attempt - */ - if (bound != NdbIndexScanOperation::BoundEQ) - break; - } + if (p.bound_type == -1) + { + DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag)); + DBUG_ASSERT(false); + // Stop setting bounds but continue with what we have + DBUG_RETURN(0); + } + } + } + + // Seen with e.g. b = 1 and c > 1 + if (part[0].bound_type == NdbIndexScanOperation::BoundLE && + part[1].bound_type == NdbIndexScanOperation::BoundGE && + memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0) + { + DBUG_PRINT("info", ("replace LE/GE pair by EQ")); + part[0].bound_type= NdbIndexScanOperation::BoundEQ; + part[1].bound_type= -1; + } + // Not seen but was in previous version + if (part[0].bound_type == NdbIndexScanOperation::BoundEQ && + part[1].bound_type == NdbIndexScanOperation::BoundGE && + memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0) + { + DBUG_PRINT("info", ("remove GE from EQ/GE pair")); + part[1].bound_type= -1; + } + for (j= 0; j <= 1; j++) + { + struct part_st &p = part[j]; + // Set bound if not done with this key + if (p.key != NULL) + { + DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d", + j, i, tot_len, part_len, p.part_last, p.bound_type)); + DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len); + + // Set bound if not cancelled via type -1 + if (p.bound_type != -1) + if (op->setBound(field->field_name, p.bound_type, p.bound_ptr)) + ERR_RETURN(op->getNdbError()); + } + } + + tot_len+= part_store_len; + } DBUG_RETURN(0); } +inline +int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) +{ + uint i; + THD *thd= current_thd; + NdbConnection *trans= m_active_trans; + + DBUG_ENTER("define_read_attrs"); + + // Define attributes to read + for (i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) || + (field->flags & PRI_KEY_FLAG) || + retrieve_all_fields) + { + if (get_ndb_value(op, field, i, buf)) + ERR_RETURN(op->getNdbError()); + } + else + { + m_value[i].ptr= NULL; + } + } + + if (table->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Getting hidden key")); + // Scanning table with no primary key + int hidden_no= table->fields; +#ifndef DBUG_OFF + const NDBTAB *tab= (const NDBTAB *) m_table; + if (!tab->getColumn(hidden_no)) + DBUG_RETURN(1); +#endif + if (get_ndb_value(op, NULL, hidden_no, NULL)) + ERR_RETURN(op->getNdbError()); + } + + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} /* Start ordered index scan in NDB @@ -1152,52 +1457,60 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, const key_range *end_key, bool sorted, byte* buf) { + bool restart; NdbConnection *trans= m_active_trans; NdbResultSet *cursor; NdbIndexScanOperation *op; - const char *index_name; DBUG_ENTER("ordered_index_scan"); DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted)); DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); - - index_name= get_index_name(active_index); - if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) - m_index[active_index].index, - (NDBTAB *) m_table))) - ERR_RETURN(trans->getNdbError()); - NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) - get_ndb_lock_type(m_lock.type); - if (!(cursor= op->readTuples(lm, 0, parallelism, sorted))) - ERR_RETURN(trans->getNdbError()); - m_active_cursor= cursor; + // Check that sorted seems to be initialised + DBUG_ASSERT(sorted == 0 || sorted == 1); + + if (m_active_cursor == 0) + { + restart= false; + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) + m_index[active_index].index, + (const NDBTAB *) m_table)) || + !(cursor= op->readTuples(lm, 0, parallelism, sorted))) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + } else { + restart= true; + op= (NdbIndexScanOperation*)m_active_cursor->getOperation(); + + DBUG_ASSERT(op->getSorted() == sorted); + DBUG_ASSERT(op->getLockMode() == + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type)); + if(op->reset_bounds()) + DBUG_RETURN(ndb_err(m_active_trans)); + } - if (start_key && - set_bounds(op, start_key, - (start_key->flag == HA_READ_KEY_EXACT) ? - NdbIndexScanOperation::BoundEQ : - (start_key->flag == HA_READ_AFTER_KEY) ? - NdbIndexScanOperation::BoundLT : - NdbIndexScanOperation::BoundLE)) - DBUG_RETURN(1); + { + const key_range *keys[2]= { start_key, end_key }; + int ret= set_bounds(op, keys); + if (ret) + DBUG_RETURN(ret); + } - if (end_key) + if (!restart) { - if (start_key && start_key->flag == HA_READ_KEY_EXACT) - { - DBUG_PRINT("info", ("start_key is HA_READ_KEY_EXACT ignoring end_key")); - } - else if (set_bounds(op, end_key, - (end_key->flag == HA_READ_AFTER_KEY) ? - NdbIndexScanOperation::BoundGE : - NdbIndexScanOperation::BoundGT)) - DBUG_RETURN(1); + DBUG_RETURN(define_read_attrs(buf, op)); + } + else + { + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + + DBUG_RETURN(next_result(buf)); } - DBUG_RETURN(define_read_attrs(buf, op)); } - /* Start a filtered scan in NDB. @@ -1227,11 +1540,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, DBUG_PRINT("info", ("Starting a new filtered scan on %s", m_tabname)); - if (!(op= trans->getNdbScanOperation((NDBTAB *) m_table))) - ERR_RETURN(trans->getNdbError()); - NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) - get_ndb_lock_type(m_lock.type); - if (!(cursor= op->readTuples(lm, 0, parallelism))) + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)) || + !(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -1298,68 +1610,18 @@ int ha_ndbcluster::full_table_scan(byte *buf) DBUG_ENTER("full_table_scan"); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); - if (!(op=trans->getNdbScanOperation((NDBTAB *) m_table))) - ERR_RETURN(trans->getNdbError()); - NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) - get_ndb_lock_type(m_lock.type); - if (!(cursor= op->readTuples(lm, 0, parallelism))) + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) || + !(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; DBUG_RETURN(define_read_attrs(buf, op)); } - -inline -int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) -{ - uint i; - THD *thd= current_thd; - NdbConnection *trans= m_active_trans; - - DBUG_ENTER("define_read_attrs"); - - // Define attributes to read - for (i= 0; i < table->fields; i++) - { - Field *field= table->field[i]; - if ((thd->query_id == field->query_id) || - (field->flags & PRI_KEY_FLAG) || - retrieve_all_fields) - { - if (get_ndb_value(op, field, i)) - ERR_RETURN(op->getNdbError()); - } - else - { - m_value[i].ptr= NULL; - } - } - - if (table->primary_key == MAX_KEY) - { - DBUG_PRINT("info", ("Getting hidden key")); - // Scanning table with no primary key - int hidden_no= table->fields; -#ifndef DBUG_OFF - const NDBTAB *tab= (NDBTAB *) m_table; - if (!tab->getColumn(hidden_no)) - DBUG_RETURN(1); -#endif - if (get_ndb_value(op, NULL, hidden_no)) - ERR_RETURN(op->getNdbError()); - } - - if (trans->execute(NoCommit) != 0) - DBUG_RETURN(ndb_err(trans)); - DBUG_PRINT("exit", ("Scan started successfully")); - DBUG_RETURN(next_result(buf)); -} - - /* Insert one record into NDB */ - int ha_ndbcluster::write_row(byte *record) { bool has_auto_increment; @@ -1370,14 +1632,18 @@ int ha_ndbcluster::write_row(byte *record) THD *thd= current_thd; DBUG_ENTER("write_row"); + + if(m_ignore_dup_key_not_supported) + { + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } statistic_increment(thd->status_var.ha_write_count, &LOCK_status); - if (table->timestamp_default_now) - update_timestamp(record+table->timestamp_default_now-1); + if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) + table->timestamp_field->set_time(); has_auto_increment= (table->next_number_field && record == table->record[0]); - skip_auto_increment= table->auto_increment_field_not_null; - if (!(op= trans->getNdbOperation((NDBTAB *) m_table))) + if (!(op= trans->getNdbOperation((const NDBTAB *) m_table))) ERR_RETURN(trans->getNdbError()); res= (m_use_write) ? op->writeTuple() :op->insertTuple(); @@ -1387,7 +1653,7 @@ int ha_ndbcluster::write_row(byte *record) if (table->primary_key == MAX_KEY) { // Table has hidden primary key - Uint64 auto_value= m_ndb->getAutoIncrementValue((NDBTAB *) m_table); + Uint64 auto_value= m_ndb->getAutoIncrementValue((const NDBTAB *) m_table); if (set_hidden_key(op, table->fields, (const byte*)&auto_value)) ERR_RETURN(op->getNdbError()); } @@ -1395,21 +1661,26 @@ int ha_ndbcluster::write_row(byte *record) { int res; - if ((has_auto_increment) && (!skip_auto_increment)) + if (has_auto_increment) + { + skip_auto_increment= FALSE; update_auto_increment(); + skip_auto_increment= !auto_increment_column_changed; + } if ((res= set_primary_key(op))) return res; } // Set non-key attribute(s) + bool set_blob_value= FALSE; for (i= 0; i < table->fields; i++) { Field *field= table->field[i]; if (!(field->flags & PRI_KEY_FLAG) && - set_ndb_value(op, field, i)) + set_ndb_value(op, field, i, &set_blob_value)) { - skip_auto_increment= true; + skip_auto_increment= TRUE; ERR_RETURN(op->getNdbError()); } } @@ -1422,20 +1693,38 @@ int ha_ndbcluster::write_row(byte *record) Find out how this is detected! */ rows_inserted++; - bulk_insert_not_flushed= true; + no_uncommitted_rows_update(1); + bulk_insert_not_flushed= TRUE; if ((rows_to_insert == 1) || ((rows_inserted % bulk_insert_rows) == 0) || - uses_blob_value(false) != 0) + set_blob_value) { + THD *thd= current_thd; // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", - (int)rows_inserted, (int)bulk_insert_rows)); - bulk_insert_not_flushed= false; - if (trans->execute(NoCommit) != 0) + (int)rows_inserted, (int)bulk_insert_rows)); + + bulk_insert_not_flushed= FALSE; + if (thd->transaction.on) { - skip_auto_increment= true; - DBUG_RETURN(ndb_err(trans)); + if (execute_no_commit(this,trans) != 0) + { + skip_auto_increment= TRUE; + no_uncommitted_rows_execute_failure(); + DBUG_RETURN(ndb_err(trans)); + } + } + else + { + if (execute_commit(this,trans) != 0) + { + skip_auto_increment= TRUE; + no_uncommitted_rows_execute_failure(); + DBUG_RETURN(ndb_err(trans)); + } + int res= trans->restart(); + DBUG_ASSERT(res == 0); } } if ((has_auto_increment) && (skip_auto_increment)) @@ -1444,11 +1733,11 @@ int ha_ndbcluster::write_row(byte *record) DBUG_PRINT("info", ("Trying to set next auto increment value to %lu", (ulong) next_val)); - if (m_ndb->setAutoIncrementValue((NDBTAB *) m_table, next_val, true)) + if (m_ndb->setAutoIncrementValue((const NDBTAB *) m_table, next_val, TRUE)) DBUG_PRINT("info", ("Setting next auto increment value to %u", next_val)); } - skip_auto_increment= true; + skip_auto_increment= TRUE; DBUG_RETURN(0); } @@ -1502,9 +1791,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) DBUG_ENTER("update_row"); statistic_increment(thd->status_var.ha_update_count, &LOCK_status); - if (table->timestamp_on_update_now) - update_timestamp(new_data+table->timestamp_on_update_now-1); - + if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) + table->timestamp_field->set_time(); + /* Check for update of primary key for special handling */ if ((table->primary_key != MAX_KEY) && (key_cmp(table->primary_key, old_data, new_data))) @@ -1552,12 +1841,12 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) if (!(op= cursor->updateTuple())) ERR_RETURN(trans->getNdbError()); ops_pending++; - if (uses_blob_value(false)) - blobs_pending= true; + if (uses_blob_value(FALSE)) + blobs_pending= TRUE; } else { - if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) || + if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || op->updateTuple() != 0) ERR_RETURN(trans->getNdbError()); @@ -1595,8 +1884,10 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Execute update operation - if (!cursor && trans->execute(NoCommit) != 0) + if (!cursor && execute_no_commit(this,trans) != 0) { + no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); + } DBUG_RETURN(0); } @@ -1630,16 +1921,20 @@ int ha_ndbcluster::delete_row(const byte *record) ERR_RETURN(trans->getNdbError()); ops_pending++; + no_uncommitted_rows_update(-1); + // If deleting from cursor, NoCommit will be handled in next_result DBUG_RETURN(0); } else { - if (!(op=trans->getNdbOperation((NDBTAB *) m_table)) || + if (!(op=trans->getNdbOperation((const NDBTAB *) m_table)) || op->deleteTuple() != 0) ERR_RETURN(trans->getNdbError()); + no_uncommitted_rows_update(-1); + if (table->primary_key == MAX_KEY) { // This table has no primary key, use "hidden" primary key @@ -1660,8 +1955,10 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation - if (trans->execute(NoCommit) != 0) + if (execute_no_commit(this,trans) != 0) { + no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); + } DBUG_RETURN(0); } @@ -1702,7 +1999,7 @@ void ha_ndbcluster::unpack_record(byte* buf) else { NdbBlob* ndb_blob= (*value).blob; - bool isNull= true; + bool isNull= TRUE; int ret= ndb_blob->getNull(isNull); DBUG_ASSERT(ret == 0); if (isNull) @@ -1717,7 +2014,7 @@ void ha_ndbcluster::unpack_record(byte* buf) { // Table with hidden primary key int hidden_no= table->fields; - const NDBTAB *tab= (NDBTAB *) m_table; + const NDBTAB *tab= (const NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); NdbRecAttr* rec= m_value[hidden_no].rec; DBUG_ASSERT(rec); @@ -1735,7 +2032,7 @@ void ha_ndbcluster::unpack_record(byte* buf) void ha_ndbcluster::print_results() { - const NDBTAB *tab= (NDBTAB*) m_table; + const NDBTAB *tab= (const NDBTAB*) m_table; DBUG_ENTER("print_results"); #ifndef DBUG_OFF @@ -1770,7 +2067,7 @@ void ha_ndbcluster::print_results() else { ndb_blob= value.blob; - bool isNull= true; + bool isNull= TRUE; ndb_blob->getNull(isNull); if (isNull) { fprintf(DBUG_FILE, "NULL\n"); @@ -1910,18 +2207,54 @@ int ha_ndbcluster::index_end() int ha_ndbcluster::index_read(byte *buf, - const byte *key, uint key_len, - enum ha_rkey_function find_flag) + const byte *key, uint key_len, + enum ha_rkey_function find_flag) { DBUG_ENTER("index_read"); DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", active_index, key_len, find_flag)); + int error; + ndb_index_type type = get_index_type(active_index); + const KEY* key_info = table->key_info+active_index; + switch (type){ + case PRIMARY_KEY_ORDERED_INDEX: + case PRIMARY_KEY_INDEX: + if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len) + { + DBUG_RETURN(pk_read(key, key_len, buf)); + } + else if (type == PRIMARY_KEY_INDEX) + { + DBUG_RETURN(1); + } + break; + case UNIQUE_ORDERED_INDEX: + case UNIQUE_INDEX: + if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len) + { + DBUG_RETURN(unique_index_read(key, key_len, buf)); + } + else if (type == UNIQUE_INDEX) + { + DBUG_RETURN(1); + } + break; + case ORDERED_INDEX: + break; + default: + case UNDEFINED_INDEX: + DBUG_ASSERT(FALSE); + DBUG_RETURN(1); + break; + } + key_range start_key; - start_key.key= key; - start_key.length= key_len; - start_key.flag= find_flag; - DBUG_RETURN(read_range_first(&start_key, NULL, false, true)); + start_key.key = key; + start_key.length = key_len; + start_key.flag = find_flag; + error= ordered_index_scan(&start_key, 0, TRUE, buf); + DBUG_RETURN(error == HA_ERR_END_OF_FILE ? HA_ERR_KEY_NOT_FOUND : error); } @@ -1962,7 +2295,10 @@ int ha_ndbcluster::index_first(byte *buf) DBUG_ENTER("index_first"); statistic_increment(current_thd->status_var.ha_read_first_count, &LOCK_status); - DBUG_RETURN(1); + // Start the ordered index scan and fetch the first row + + // Only HA_READ_ORDER indexes get called by index_first + DBUG_RETURN(ordered_index_scan(0, 0, TRUE, buf)); } @@ -1970,23 +2306,31 @@ int ha_ndbcluster::index_last(byte *buf) { DBUG_ENTER("index_last"); statistic_increment(current_thd->status_var.ha_read_last_count,&LOCK_status); + int res; + if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){ + NdbResultSet *cursor= m_active_cursor; + while((res= cursor->nextResult(TRUE)) == 0); + if(res == 1){ + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); + } + } DBUG_RETURN(1); } -int ha_ndbcluster::read_range_first(const key_range *start_key, - const key_range *end_key, - bool eq_range, bool sorted) +inline +int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key, + const key_range *end_key, + bool eq_range, bool sorted, + byte* buf) { KEY* key_info; int error= 1; - byte* buf= table->record[0]; - DBUG_ENTER("ha_ndbcluster::read_range_first"); + DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf"); DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted)); - if (m_active_cursor) - close_scan(); - switch (get_index_type(active_index)){ case PRIMARY_KEY_ORDERED_INDEX: case PRIMARY_KEY_INDEX: @@ -2014,15 +2358,26 @@ int ha_ndbcluster::read_range_first(const key_range *start_key, break; } - // Start the ordered index scan and fetch the first row - error= ordered_index_scan(start_key, end_key, sorted, - buf); - + error= ordered_index_scan(start_key, end_key, sorted, buf); DBUG_RETURN(error); } +int ha_ndbcluster::read_range_first(const key_range *start_key, + const key_range *end_key, + bool eq_range, bool sorted) +{ + byte* buf= table->record[0]; + DBUG_ENTER("ha_ndbcluster::read_range_first"); + + DBUG_RETURN(read_range_first_to_buf(start_key, + end_key, + eq_range, + sorted, + buf)); +} + int ha_ndbcluster::read_range_next() { DBUG_ENTER("ha_ndbcluster::read_range_next"); @@ -2040,7 +2395,8 @@ int ha_ndbcluster::rnd_init(bool scan) { if (!scan) DBUG_RETURN(1); - cursor->restart(); + int res= cursor->restart(); + DBUG_ASSERT(res == 0); } index_init(table->primary_key); DBUG_RETURN(0); @@ -2063,8 +2419,10 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); - if (trans->execute(NoCommit) != 0) + if (execute_no_commit(this,trans) != 0) { + no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); + } ops_pending= 0; } @@ -2152,7 +2510,7 @@ void ha_ndbcluster::position(const byte *record) DBUG_PRINT("info", ("Getting hidden key")); int hidden_no= table->fields; NdbRecAttr* rec= m_value[hidden_no].rec; - const NDBTAB *tab= (NDBTAB *) m_table; + const NDBTAB *tab= (const NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); DBUG_ASSERT(hidden_col->getPrimaryKey() && hidden_col->getAutoIncrement() && @@ -2177,10 +2535,26 @@ void ha_ndbcluster::info(uint flag) DBUG_PRINT("info", ("HA_STATUS_NO_LOCK")); if (flag & HA_STATUS_TIME) DBUG_PRINT("info", ("HA_STATUS_TIME")); - if (flag & HA_STATUS_CONST) - DBUG_PRINT("info", ("HA_STATUS_CONST")); if (flag & HA_STATUS_VARIABLE) + { DBUG_PRINT("info", ("HA_STATUS_VARIABLE")); + if (m_table_info) + { + records_update(); + } + else + { + Uint64 rows; + if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){ + records= rows; + } + } + } + if (flag & HA_STATUS_CONST) + { + DBUG_PRINT("info", ("HA_STATUS_CONST")); + set_rec_per_key(); + } if (flag & HA_STATUS_ERRKEY) { DBUG_PRINT("info", ("HA_STATUS_ERRKEY")); @@ -2273,14 +2647,20 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) break; case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/ DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY")); - - DBUG_PRINT("info", ("Turning ON use of write instead of insert")); - m_use_write= TRUE; + if (current_thd->lex->sql_command == SQLCOM_REPLACE) + { + DBUG_PRINT("info", ("Turning ON use of write instead of insert")); + m_use_write= TRUE; + } else + { + m_ignore_dup_key_not_supported= TRUE; + } break; case HA_EXTRA_NO_IGNORE_DUP_KEY: DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY")); DBUG_PRINT("info", ("Turning OFF use of write instead of insert")); - m_use_write= false; + m_use_write= FALSE; + m_ignore_dup_key_not_supported= FALSE; break; case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those where field->query_id is the same as @@ -2327,7 +2707,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) void ha_ndbcluster::start_bulk_insert(ha_rows rows) { int bytes, batch; - const NDBTAB *tab= (NDBTAB *) m_table; + const NDBTAB *tab= (const NDBTAB *) m_table; DBUG_ENTER("start_bulk_insert"); DBUG_PRINT("enter", ("rows: %d", (int)rows)); @@ -2368,9 +2748,11 @@ int ha_ndbcluster::end_bulk_insert() DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", rows_inserted, bulk_insert_rows)); - bulk_insert_not_flushed= false; - if (trans->execute(NoCommit) != 0) - error= ndb_err(trans); + bulk_insert_not_flushed= FALSE; + if (execute_no_commit(this,trans) != 0) { + no_uncommitted_rows_execute_failure(); + my_errno= error= ndb_err(trans); + } } rows_inserted= 0; @@ -2396,7 +2778,7 @@ int ha_ndbcluster::reset() const char **ha_ndbcluster::bas_ext() const -{ static const char *ext[1]= { NullS }; return ext; } +{ static const char *ext[]= { ".ndb", NullS }; return ext; } /* @@ -2407,7 +2789,11 @@ const char **ha_ndbcluster::bas_ext() const double ha_ndbcluster::scan_time() { - return rows2double(records*1000); + DBUG_ENTER("ha_ndbcluster::scan_time()"); + double res= rows2double(records*1000); + DBUG_PRINT("exit", ("table: %s value: %f", + m_tabname, res)); + DBUG_RETURN(res); } @@ -2416,14 +2802,16 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, enum thr_lock_type lock_type) { DBUG_ENTER("store_lock"); - if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK) { - + /* If we are not doing a LOCK TABLE, then allow multiple writers */ - if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + /* Since NDB does not currently have table locks + this is treated as a ordinary lock */ + + if ((lock_type >= TL_WRITE_ALLOW_WRITE && lock_type <= TL_WRITE) && !thd->in_lock_tables) lock_type= TL_WRITE_ALLOW_WRITE; @@ -2477,9 +2865,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) NdbConnection* trans= NULL; DBUG_ENTER("external_lock"); - DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d", - thd->transaction.ndb_lock_count)); - /* Check that this handler instance has a connection set up to the Ndb object of thd @@ -2487,10 +2872,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) if (check_ndb_connection()) DBUG_RETURN(1); + Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; + + DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d", + thd_ndb->lock_count)); + if (lock_type != F_UNLCK) { DBUG_PRINT("info", ("lock_type != F_UNLCK")); - if (!thd->transaction.ndb_lock_count++) + if (!thd_ndb->lock_count++) { PRINT_OPTION_FLAGS(thd); @@ -2502,10 +2892,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) trans= m_ndb->startTransaction(); if (trans == NULL) - { - thd->transaction.ndb_lock_count--; // We didn't get the lock ERR_RETURN(m_ndb->getNdbError()); - } + no_uncommitted_rows_reset(thd); thd->transaction.stmt.ndb_tid= trans; } else @@ -2518,10 +2906,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) trans= m_ndb->startTransaction(); if (trans == NULL) - { - thd->transaction.ndb_lock_count--; // We didn't get the lock ERR_RETURN(m_ndb->getNdbError()); - } + no_uncommitted_rows_reset(thd); /* If this is the start of a LOCK TABLE, a table look @@ -2555,15 +2941,25 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) (NdbConnection*)thd->transaction.all.ndb_tid: (NdbConnection*)thd->transaction.stmt.ndb_tid; DBUG_ASSERT(m_active_trans); - // Start of transaction retrieve_all_fields= FALSE; ops_pending= 0; + { + NDBDICT *dict= m_ndb->getDictionary(); + const NDBTAB *tab; + void *tab_info; + if (!(tab= dict->getTable(m_tabname, &tab_info))) + ERR_RETURN(dict->getNdbError()); + DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); + m_table= (void *)tab; + m_table_info= tab_info; + } + no_uncommitted_rows_init(thd); } else { DBUG_PRINT("info", ("lock_type == F_UNLCK")); - if (!--thd->transaction.ndb_lock_count) + if (!--thd_ndb->lock_count) { DBUG_PRINT("trans", ("Last external_lock")); PRINT_OPTION_FLAGS(thd); @@ -2580,7 +2976,28 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) thd->transaction.stmt.ndb_tid= 0; } } - m_active_trans= NULL; + m_table= NULL; + m_table_info= NULL; + /* + This is the place to make sure this handler instance + no longer are connected to the active transaction. + + And since the handler is no longer part of the transaction + it can't have open cursors, ops or blobs pending. + */ + m_active_trans= NULL; + + if (m_active_cursor) + DBUG_PRINT("warning", ("m_active_cursor != NULL")); + m_active_cursor= NULL; + + if (blobs_pending) + DBUG_PRINT("warning", ("blobs_pending != 0")); + blobs_pending= 0; + + if (ops_pending) + DBUG_PRINT("warning", ("ops_pending != 0L")); + ops_pending= 0; } DBUG_RETURN(error); } @@ -2589,6 +3006,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) When using LOCK TABLE's external_lock is only called when the actual TABLE LOCK is done. Under LOCK TABLES, each used tables will force a call to start_stmt. + Ndb doesn't currently support table locks, and will do ordinary + startTransaction for each transaction/statement. */ int ha_ndbcluster::start_stmt(THD *thd) @@ -2604,9 +3023,12 @@ int ha_ndbcluster::start_stmt(THD *thd) NdbConnection *tablock_trans= (NdbConnection*)thd->transaction.all.ndb_tid; DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans)); - DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans); + DBUG_ASSERT(tablock_trans); +// trans= m_ndb->hupp(tablock_trans); + trans= m_ndb->startTransaction(); if (trans == NULL) ERR_RETURN(m_ndb->getNdbError()); + no_uncommitted_rows_reset(thd); thd->transaction.stmt.ndb_tid= trans; } m_active_trans= trans; @@ -2626,7 +3048,7 @@ int ha_ndbcluster::start_stmt(THD *thd) int ndbcluster_commit(THD *thd, void *ndb_transaction) { int res= 0; - Ndb *ndb= (Ndb*)thd->transaction.ndb; + Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb; NdbConnection *trans= (NdbConnection*)ndb_transaction; DBUG_ENTER("ndbcluster_commit"); @@ -2635,7 +3057,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction) "stmt" : "all")); DBUG_ASSERT(ndb && trans); - if (trans->execute(Commit) != 0) + if (execute_commit(0,trans) != 0) { const NdbError err= trans->getNdbError(); const NdbOperation *error_op= trans->getNdbErrorOperation(); @@ -2644,7 +3066,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction) if (res != -1) ndbcluster_print_error(res, error_op); } - ndb->closeTransaction(trans); + ndb->closeTransaction(trans); DBUG_RETURN(res); } @@ -2656,7 +3078,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction) int ndbcluster_rollback(THD *thd, void *ndb_transaction) { int res= 0; - Ndb *ndb= (Ndb*)thd->transaction.ndb; + Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb; NdbConnection *trans= (NdbConnection*)ndb_transaction; DBUG_ENTER("ndbcluster_rollback"); @@ -2691,6 +3113,8 @@ static int create_ndb_column(NDBCOL &col, { // Set name col.setName(field->field_name); + // Get char set + CHARSET_INFO *cs= field->charset(); // Set type and sizes const enum enum_field_types mysql_type= field->real_type(); switch (mysql_type) { @@ -2762,15 +3186,22 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_STRING: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Binary); - else + else { col.setType(NDBCOL::Char); - col.setLength(field->pack_length()); + col.setCharset(cs); + } + if (field->pack_length() == 0) + col.setLength(1); // currently ndb does not support size 0 + else + col.setLength(field->pack_length()); break; case MYSQL_TYPE_VAR_STRING: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Varbinary); - else + else { col.setType(NDBCOL::Varchar); + col.setCharset(cs); + } col.setLength(field->pack_length()); break; // Blob types (all come in as MYSQL_TYPE_BLOB) @@ -2778,8 +3209,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_TINY_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } col.setInlineSize(256); // No parts col.setPartSize(0); @@ -2789,8 +3222,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } // Use "<=" even if "<" is the exact condition if (field->max_length() <= (1 << 8)) goto mysql_type_tiny_blob; @@ -2809,8 +3244,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_MEDIUM_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } col.setInlineSize(256); col.setPartSize(4000); col.setStripeSize(8); @@ -2819,8 +3256,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_LONG_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } col.setInlineSize(256); col.setPartSize(8000); col.setStripeSize(4); @@ -2849,12 +3288,12 @@ static int create_ndb_column(NDBCOL &col, { col.setAutoIncrement(TRUE); ulonglong value= info->auto_increment_value ? - info->auto_increment_value -1 : (ulonglong) 0; + info->auto_increment_value : (ulonglong) 1; DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value)); col.setAutoIncrementInitialValue(value); } else - col.setAutoIncrement(false); + col.setAutoIncrement(FALSE); return 0; } @@ -2872,12 +3311,24 @@ int ha_ndbcluster::create(const char *name, const void *data, *pack_data; const char **key_names= form->keynames.type_names; char name2[FN_HEADLEN]; + bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE); DBUG_ENTER("create"); DBUG_PRINT("enter", ("name: %s", name)); fn_format(name2, name, "", "",2); // Remove the .frm extension set_dbname(name2); - set_tabname(name2); + set_tabname(name2); + + if (create_from_engine) + { + /* + Table alreay exists in NDB and frm file has been created by + caller. + Do Ndb specific stuff, such as create a .ndb file + */ + my_errno= write_ndb_file(); + DBUG_RETURN(my_errno); + } DBUG_PRINT("table", ("name: %s", m_tabname)); tab.setName(m_tabname); @@ -2912,22 +3363,18 @@ int ha_ndbcluster::create(const char *name, col.setName("$PK"); col.setType(NdbDictionary::Column::Bigunsigned); col.setLength(1); - col.setNullable(false); + col.setNullable(FALSE); col.setPrimaryKey(TRUE); col.setAutoIncrement(TRUE); tab.addColumn(col); } - my_errno= 0; - if (check_ndb_connection()) - { - my_errno= HA_ERR_NO_CONNECTION; + if ((my_errno= check_ndb_connection())) DBUG_RETURN(my_errno); - } // Create the table in NDB NDBDICT *dict= m_ndb->getDictionary(); - if (dict->createTable(tab)) + if (dict->createTable(tab) != 0) { const NdbError err= dict->getNdbError(); ERR_PRINT(err); @@ -2940,6 +3387,9 @@ int ha_ndbcluster::create(const char *name, // Create secondary indexes my_errno= build_index_list(form, ILBP_CREATE); + if (!my_errno) + my_errno= write_ndb_file(); + DBUG_RETURN(my_errno); } @@ -2948,7 +3398,7 @@ int ha_ndbcluster::create_ordered_index(const char *name, KEY *key_info) { DBUG_ENTER("create_ordered_index"); - DBUG_RETURN(create_index(name, key_info, false)); + DBUG_RETURN(create_index(name, key_info, FALSE)); } int ha_ndbcluster::create_unique_index(const char *name, @@ -2956,7 +3406,7 @@ int ha_ndbcluster::create_unique_index(const char *name, { DBUG_ENTER("create_unique_index"); - DBUG_RETURN(create_index(name, key_info, true)); + DBUG_RETURN(create_index(name, key_info, TRUE)); } @@ -2975,7 +3425,6 @@ int ha_ndbcluster::create_index(const char *name, DBUG_ENTER("create_index"); DBUG_PRINT("enter", ("name: %s ", name)); - // NdbDictionary::Index ndb_index(name); NdbDictionary::Index ndb_index(name); if (unique) ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); @@ -2983,7 +3432,7 @@ int ha_ndbcluster::create_index(const char *name, { ndb_index.setType(NdbDictionary::Index::OrderedIndex); // TODO Only temporary ordered indexes supported - ndb_index.setLogging(false); + ndb_index.setLogging(FALSE); } ndb_index.setTable(m_tabname); @@ -3016,14 +3465,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) set_tabname(from); set_tabname(to, new_tabname); - if (check_ndb_connection()) { - my_errno= HA_ERR_NO_CONNECTION; - DBUG_RETURN(my_errno); - } + if (check_ndb_connection()) + DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION); + int result= alter_table_name(m_tabname, new_tabname); if (result == 0) + { set_tabname(to); + handler::rename_table(from, to); + } DBUG_RETURN(result); } @@ -3049,6 +3500,7 @@ int ha_ndbcluster::alter_table_name(const char *from, const char *to) ERR_RETURN(dict->getNdbError()); m_table= NULL; + m_table_info= NULL; DBUG_RETURN(0); } @@ -3067,6 +3519,8 @@ int ha_ndbcluster::delete_table(const char *name) if (check_ndb_connection()) DBUG_RETURN(HA_ERR_NO_CONNECTION); + + handler::delete_table(name); DBUG_RETURN(drop_table()); } @@ -3113,11 +3567,12 @@ ulonglong ha_ndbcluster::get_auto_increment() Uint64 auto_value; DBUG_ENTER("get_auto_increment"); DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); - cache_size= ((rows_to_insert > autoincrement_prefetch) ? - rows_to_insert : autoincrement_prefetch); + cache_size= ((rows_to_insert - rows_inserted < autoincrement_prefetch) ? + rows_to_insert - rows_inserted : + max(rows_to_insert, autoincrement_prefetch)); auto_value= ((skip_auto_increment) ? - m_ndb->readAutoIncrementValue((NDBTAB *) m_table) : - m_ndb->getAutoIncrementValue((NDBTAB *) m_table, cache_size)); + m_ndb->readAutoIncrementValue((const NDBTAB *) m_table) : + m_ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size)); DBUG_RETURN((ulonglong) auto_value); } @@ -3132,18 +3587,22 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_active_cursor(NULL), m_ndb(NULL), m_table(NULL), + m_table_info(NULL), m_table_flags(HA_REC_NOT_IN_SEQ | HA_NULL_IN_KEY | - HA_NOT_EXACT_COUNT | - HA_NO_PREFIX_CHAR_KEYS), - m_use_write(false), + HA_AUTO_PART_KEY | + HA_NO_PREFIX_CHAR_KEYS), + m_share(0), + m_use_write(FALSE), + m_ignore_dup_key_not_supported(FALSE), retrieve_all_fields(FALSE), rows_to_insert(1), rows_inserted(0), bulk_insert_rows(1024), - bulk_insert_not_flushed(false), + bulk_insert_not_flushed(FALSE), ops_pending(0), - skip_auto_increment(true), + skip_auto_increment(TRUE), + blobs_pending(0), blobs_buffer(0), blobs_buffer_size(0), dupkey((uint) -1) @@ -3155,15 +3614,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_tabname[0]= '\0'; m_dbname[0]= '\0'; - // TODO Adjust number of records and other parameters for proper - // selection of scan/pk access - records= 100; + records= ~(ha_rows)0; // uninitialized block_size= 1024; for (i= 0; i < MAX_KEY; i++) { m_index[i].type= UNDEFINED_INDEX; - m_index[i].unique_name= NULL; m_index[i].unique_index= NULL; m_index[i].index= NULL; } @@ -3180,12 +3636,18 @@ ha_ndbcluster::~ha_ndbcluster() { DBUG_ENTER("~ha_ndbcluster"); + if (m_share) + free_share(m_share); release_metadata(); my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); blobs_buffer= 0; // Check for open cursor/transaction + if (m_active_cursor) { + } DBUG_ASSERT(m_active_cursor == NULL); + if (m_active_trans) { + } DBUG_ASSERT(m_active_trans == NULL); DBUG_VOID_RETURN; @@ -3200,6 +3662,7 @@ ha_ndbcluster::~ha_ndbcluster() int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) { + int res; KEY *key; DBUG_ENTER("open"); DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d", @@ -3222,10 +3685,16 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) set_dbname(name); set_tabname(name); - if (check_ndb_connection()) + if (check_ndb_connection()) { + free_share(m_share); m_share= 0; DBUG_RETURN(HA_ERR_NO_CONNECTION); + } + + res= get_metadata(name); + if (!res) + info(HA_STATUS_VARIABLE | HA_STATUS_CONST); - DBUG_RETURN(get_metadata(name)); + DBUG_RETURN(res); } @@ -3237,89 +3706,87 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) int ha_ndbcluster::close(void) { DBUG_ENTER("close"); - free_share(m_share); + free_share(m_share); m_share= 0; release_metadata(); m_ndb= NULL; DBUG_RETURN(0); } -Ndb* ha_ndbcluster::seize_ndb() +Thd_ndb* ha_ndbcluster::seize_thd_ndb() { - Ndb* ndb; - DBUG_ENTER("seize_ndb"); + Thd_ndb *thd_ndb; + DBUG_ENTER("seize_thd_ndb"); -#ifdef USE_NDB_POOL - // Seize from pool - ndb= Ndb::seize(); -#else - ndb= new Ndb(""); -#endif - if (ndb->init(max_transactions) != 0) + thd_ndb= new Thd_ndb(); + thd_ndb->ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info)); + if (thd_ndb->ndb->init(max_transactions) != 0) { - ERR_PRINT(ndb->getNdbError()); + ERR_PRINT(thd_ndb->ndb->getNdbError()); /* TODO Alt.1 If init fails because to many allocated Ndb wait on condition for a Ndb object to be released. Alt.2 Seize/release from pool, wait until next release */ - delete ndb; - ndb= NULL; + delete thd_ndb; + thd_ndb= NULL; } - DBUG_RETURN(ndb); + DBUG_RETURN(thd_ndb); } -void ha_ndbcluster::release_ndb(Ndb* ndb) +void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb) { - DBUG_ENTER("release_ndb"); -#ifdef USE_NDB_POOL - // Release to pool - Ndb::release(ndb); -#else - delete ndb; -#endif + DBUG_ENTER("release_thd_ndb"); + delete thd_ndb; DBUG_VOID_RETURN; } /* - If this thread already has a Ndb object allocated + If this thread already has a Thd_ndb object allocated in current THD, reuse it. Otherwise - seize a Ndb object, assign it to current THD and use it. - - Having a Ndb object also means that a connection to - NDB cluster has been opened. The connection is - checked. + seize a Thd_ndb object, assign it to current THD and use it. */ +Ndb* check_ndb_in_thd(THD* thd) +{ + DBUG_ENTER("check_ndb_in_thd"); + Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; + + if (!thd_ndb) + { + if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) + DBUG_RETURN(NULL); + thd->transaction.thd_ndb= thd_ndb; + } + DBUG_RETURN(thd_ndb->ndb); +} + + int ha_ndbcluster::check_ndb_connection() { THD* thd= current_thd; - Ndb* ndb; DBUG_ENTER("check_ndb_connection"); - if (!thd->transaction.ndb) - { - ndb= seize_ndb(); - if (!ndb) - DBUG_RETURN(2); - thd->transaction.ndb= ndb; - } - m_ndb= (Ndb*)thd->transaction.ndb; + if (!(m_ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); m_ndb->setDatabaseName(m_dbname); DBUG_RETURN(0); } + void ndbcluster_close_connection(THD *thd) { - Ndb* ndb; + Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; DBUG_ENTER("ndbcluster_close_connection"); - ndb= (Ndb*)thd->transaction.ndb; - ha_ndbcluster::release_ndb(ndb); - thd->transaction.ndb= NULL; + if (thd_ndb) + { + ha_ndbcluster::release_thd_ndb(thd_ndb); + thd->transaction.thd_ndb= NULL; + } DBUG_VOID_RETURN; } @@ -3328,23 +3795,29 @@ void ndbcluster_close_connection(THD *thd) Try to discover one table from NDB */ -int ndbcluster_discover(const char *dbname, const char *name, +int ndbcluster_discover(THD* thd, const char *db, const char *name, const void** frmblob, uint* frmlen) { uint len; const void* data; const NDBTAB* tab; + Ndb* ndb; DBUG_ENTER("ndbcluster_discover"); - DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name)); + DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); - Ndb ndb(dbname); - if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0)) - ERR_RETURN(ndb.getNdbError()); - - if (!(tab= ndb.getDictionary()->getTable(name))) - { - DBUG_PRINT("info", ("Table %s not found", name)); - DBUG_RETURN(1); + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + ndb->setDatabaseName(db); + + NDBDICT* dict= ndb->getDictionary(); + dict->set_local_table_data_size(sizeof(Ndb_table_local_info)); + dict->invalidateTable(name); + if (!(tab= dict->getTable(name))) + { + const NdbError err= dict->getNdbError(); + if (err.code == 709) + DBUG_RETURN(1); + ERR_RETURN(err); } DBUG_PRINT("info", ("Found table %s", tab->getName())); @@ -3366,41 +3839,202 @@ int ndbcluster_discover(const char *dbname, const char *name, DBUG_RETURN(0); } - -#ifdef USE_DISCOVER_ON_STARTUP /* - Dicover tables from NDB Cluster - - fetch a list of tables from NDB - - store the frm file for each table on disk - - if the table has an attached frm file - - if the database of the table exists -*/ + Check if a table exists in NDB + + */ -int ndb_discover_tables() +int ndbcluster_table_exists(THD* thd, const char *db, const char *name) { + uint len; + const void* data; + const NDBTAB* tab; + Ndb* ndb; + DBUG_ENTER("ndbcluster_table_exists"); + DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); + + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + ndb->setDatabaseName(db); + + NDBDICT* dict= ndb->getDictionary(); + dict->set_local_table_data_size(sizeof(Ndb_table_local_info)); + dict->invalidateTable(name); + if (!(tab= dict->getTable(name))) + { + const NdbError err= dict->getNdbError(); + if (err.code == 709) + DBUG_RETURN(0); + ERR_RETURN(err); + } + + DBUG_PRINT("info", ("Found table %s", tab->getName())); + DBUG_RETURN(1); +} + + + +extern "C" byte* tables_get_key(const char *entry, uint *length, + my_bool not_used __attribute__((unused))) +{ + *length= strlen(entry); + return (byte*) entry; +} + + +int ndbcluster_find_files(THD *thd,const char *db,const char *path, + const char *wild, bool dir, List<char> *files) +{ + DBUG_ENTER("ndbcluster_find_files"); + DBUG_PRINT("enter", ("db: %s", db)); + { // extra bracket to avoid gcc 2.95.3 warning uint i; + Ndb* ndb; + char name[FN_REFLEN]; + HASH ndb_tables, ok_tables; NdbDictionary::Dictionary::List list; - NdbDictionary::Dictionary* dict; - char path[FN_REFLEN]; - DBUG_ENTER("ndb_discover_tables"); - - /* List tables in NDB Cluster kernel */ - dict= g_ndb->getDictionary(); + + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + + if (dir) + DBUG_RETURN(0); // Discover of databases not yet supported + + // List tables in NDB + NDBDICT *dict= ndb->getDictionary(); if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) - ERR_RETURN(g_ndb->getNdbError()); - + ERR_RETURN(dict->getNdbError()); + + if (hash_init(&ndb_tables, system_charset_info,list.count,0,0, + (hash_get_key)tables_get_key,0,0)) + { + DBUG_PRINT("error", ("Failed to init HASH ndb_tables")); + DBUG_RETURN(-1); + } + + if (hash_init(&ok_tables, system_charset_info,32,0,0, + (hash_get_key)tables_get_key,0,0)) + { + DBUG_PRINT("error", ("Failed to init HASH ok_tables")); + hash_free(&ndb_tables); + DBUG_RETURN(-1); + } + for (i= 0 ; i < list.count ; i++) { NdbDictionary::Dictionary::List::Element& t= list.elements[i]; + DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name)); + + // Add only tables that belongs to db + if (my_strcasecmp(system_charset_info, t.database, db)) + continue; - DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name)); - if (create_table_from_handler(t.database, t.name, true)) - DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name)); + // Apply wildcard to list of tables in NDB + if (wild) + { + if (lower_case_table_names) + { + if (wild_case_compare(files_charset_info, t.name, wild)) + continue; + } + else if (wild_compare(t.name,wild,0)) + continue; + } + DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name)); + my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name)); } - DBUG_RETURN(0); + + char *file_name; + List_iterator<char> it(*files); + List<char> delete_list; + while ((file_name=it++)) + { + DBUG_PRINT("info", ("%s", file_name)); + if (hash_search(&ndb_tables, file_name, strlen(file_name))) + { + DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name)); + // File existed in NDB and as frm file, put in ok_tables list + my_hash_insert(&ok_tables, (byte*)file_name); + continue; + } + + // File is not in NDB, check for .ndb file with this name + (void)strxnmov(name, FN_REFLEN, + mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS); + DBUG_PRINT("info", ("Check access for %s", name)); + if (access(name, F_OK)) + { + DBUG_PRINT("info", ("%s did not exist on disk", name)); + // .ndb file did not exist on disk, another table type + continue; + } + + DBUG_PRINT("info", ("%s existed on disk", name)); + // The .ndb file exists on disk, but it's not in list of tables in ndb + // Verify that handler agrees table is gone. + if (ndbcluster_table_exists(thd, db, file_name) == 0) + { + DBUG_PRINT("info", ("NDB says %s does not exists", file_name)); + it.remove(); + // Put in list of tables to remove from disk + delete_list.push_back(thd->strdup(file_name)); + } + } + + // Check for new files to discover + DBUG_PRINT("info", ("Checking for new files to discover")); + List<char> create_list; + for (i= 0 ; i < ndb_tables.records ; i++) + { + file_name= hash_element(&ndb_tables, i); + if (!hash_search(&ok_tables, file_name, strlen(file_name))) + { + DBUG_PRINT("info", ("%s must be discovered", file_name)); + // File is in list of ndb tables and not in ok_tables + // This table need to be created + create_list.push_back(thd->strdup(file_name)); + } + } + + // Lock mutex before deleting and creating frm files + pthread_mutex_lock(&LOCK_open); + + if (!global_read_lock) + { + // Delete old files + List_iterator_fast<char> it3(delete_list); + while ((file_name=it3++)) + { + DBUG_PRINT("info", ("Remove table %s/%s",db, file_name )); + // Delete the table and all related files + TABLE_LIST table_list; + bzero((char*) &table_list,sizeof(table_list)); + table_list.db= (char*) db; + table_list.alias=table_list.real_name=(char*)file_name; + (void)mysql_rm_table_part2(thd, &table_list, + /* if_exists */ TRUE, + /* drop_temporary */ FALSE, + /* dont_log_query*/ TRUE); + } + } + + // Create new files + List_iterator_fast<char> it2(create_list); + while ((file_name=it2++)) + { + DBUG_PRINT("info", ("Table %s need discovery", name)); + if (ha_create_table_from_engine(thd, db, file_name, TRUE) == 0) + files->push_back(thd->strdup(file_name)); + } + + pthread_mutex_unlock(&LOCK_open); + + hash_free(&ok_tables); + hash_free(&ndb_tables); + } // extra bracket to avoid gcc 2.95.3 warning + DBUG_RETURN(0); } -#endif /* @@ -3410,34 +4044,55 @@ int ndb_discover_tables() bool ndbcluster_init() { + int res; DBUG_ENTER("ndbcluster_init"); // Set connectstring if specified if (ndbcluster_connectstring != 0) - { DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring)); - Ndb::setConnectString(ndbcluster_connectstring); + if ((g_ndb_cluster_connection= + new Ndb_cluster_connection(ndbcluster_connectstring)) == 0) + { + DBUG_PRINT("error",("Ndb_cluster_connection(%s)",ndbcluster_connectstring)); + DBUG_RETURN(TRUE); } + // Create a Ndb object to open the connection to NDB - g_ndb= new Ndb("sys"); + g_ndb= new Ndb(g_ndb_cluster_connection, "sys"); + g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info)); if (g_ndb->init() != 0) { ERR_PRINT (g_ndb->getNdbError()); DBUG_RETURN(TRUE); } - if (g_ndb->waitUntilReady() != 0) + + if ((res= g_ndb_cluster_connection->connect(1)) == 0) { - ERR_PRINT (g_ndb->getNdbError()); - DBUG_RETURN(TRUE); + g_ndb->waitUntilReady(10); + } + else if(res == 1) + { + if (g_ndb_cluster_connection->start_connect_thread()) { + DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()")); + DBUG_RETURN(TRUE); + } } + else + { + DBUG_ASSERT(res == -1); + DBUG_PRINT("error", ("permanent error")); + DBUG_RETURN(TRUE); + } + (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0, (hash_get_key) ndbcluster_get_key,0,0); pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); + ndbcluster_inited= 1; #ifdef USE_DISCOVER_ON_STARTUP if (ndb_discover_tables() != 0) DBUG_RETURN(TRUE); #endif - DBUG_RETURN(false); + DBUG_RETURN(FALSE); } @@ -3450,15 +4105,15 @@ bool ndbcluster_init() bool ndbcluster_end() { DBUG_ENTER("ndbcluster_end"); - - delete g_ndb; + if(g_ndb) + delete g_ndb; g_ndb= NULL; + if (g_ndb_cluster_connection) + delete g_ndb_cluster_connection; + g_ndb_cluster_connection= NULL; if (!ndbcluster_inited) DBUG_RETURN(0); hash_free(&ndbcluster_open_tables); -#ifdef USE_NDB_POOL - ndb_pool_release(); -#endif pthread_mutex_destroy(&ndbcluster_mutex); ndbcluster_inited= 0; DBUG_RETURN(0); @@ -3584,8 +4239,6 @@ ha_ndbcluster::records_in_range(uint inx, key_range *min_key, NDB_INDEX_TYPE idx_type= get_index_type(inx); DBUG_ENTER("records_in_range"); - DBUG_PRINT("enter", ("inx: %u", inx)); - // Prevent partial read of hash indexes by returning HA_POS_ERROR if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) && ((min_key && min_key->length < key_length) || @@ -3760,4 +4413,89 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, DBUG_RETURN(0); } + +static +int +ndb_get_table_statistics(Ndb* ndb, const char * table, + Uint64* row_count, Uint64* commit_count) +{ + DBUG_ENTER("ndb_get_table_statistics"); + DBUG_PRINT("enter", ("table: %s", table)); + + do + { + NdbConnection* pTrans= ndb->startTransaction(); + if (pTrans == NULL) + break; + + NdbScanOperation* pOp= pTrans->getNdbScanOperation(table); + if (pOp == NULL) + break; + + NdbResultSet* rs= pOp->readTuples(NdbOperation::LM_CommittedRead); + if (rs == 0) + break; + + int check= pOp->interpret_exit_last_row(); + if (check == -1) + break; + + Uint64 rows, commits; + pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows); + pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits); + + check= pTrans->execute(NoCommit); + if (check == -1) + break; + + Uint64 sum_rows= 0; + Uint64 sum_commits= 0; + while((check= rs->nextResult(TRUE)) == 0) + { + sum_rows+= rows; + sum_commits+= commits; + } + + if (check == -1) + break; + + ndb->closeTransaction(pTrans); + if(row_count) + * row_count= sum_rows; + if(commit_count) + * commit_count= sum_commits; + DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits)); + DBUG_RETURN(0); + } while(0); + + DBUG_PRINT("exit", ("failed")); + DBUG_RETURN(-1); +} + +/* + Create a .ndb file to serve as a placeholder indicating + that the table with this name is a ndb table +*/ + +int ha_ndbcluster::write_ndb_file() +{ + File file; + bool error=1; + char path[FN_REFLEN]; + + DBUG_ENTER("write_ndb_file"); + DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname)); + + (void)strxnmov(path, FN_REFLEN, + mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS); + + if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0) + { + // It's an empty file + error=0; + my_close(file,MYF(0)); + } + DBUG_RETURN(error); +} + #endif /* HAVE_NDBCLUSTER_DB */ |